Skip to content

Commit

Permalink
Simplify rebalancer's weight function (#51632)
Browse files Browse the repository at this point in the history
This commit inlines the `weightShardAdded` and `weightShardRemoved` methods
from the `BalancedShardsAllocator#WeightFunction` that respectively add and
subtract 1 (±ε) from the result of `weight`. It then follows up with a number
of simplifications that this inlining enables.

As a side-effect it also somewhat reduces the number of calls to canRebalance
and canAllocate during rebalancing when there are multiple shards of the same
index on a node that is heavier than average.
  • Loading branch information
DaveCTurner authored Jan 31, 2020
1 parent 6209962 commit 635fe34
Showing 1 changed file with 58 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.StreamSupport;

import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING;

Expand Down Expand Up @@ -205,15 +206,14 @@ public float getShardBalance() {
* </ul>
* <code>weight(node, index) = weight<sub>index</sub>(node, index) + weight<sub>node</sub>(node, index)</code>
*/
public static class WeightFunction {
private static class WeightFunction {

private final float indexBalance;
private final float shardBalance;
private final float theta0;
private final float theta1;


public WeightFunction(float indexBalance, float shardBalance) {
WeightFunction(float indexBalance, float shardBalance) {
float sum = indexBalance + shardBalance;
if (sum <= 0.0f) {
throw new IllegalArgumentException("Balance factors must sum to a value > 0 but was: " + sum);
Expand All @@ -224,21 +224,9 @@ public WeightFunction(float indexBalance, float shardBalance) {
this.shardBalance = shardBalance;
}

public float weight(Balancer balancer, ModelNode node, String index) {
return weight(balancer, node, index, 0);
}

public float weightShardAdded(Balancer balancer, ModelNode node, String index) {
return weight(balancer, node, index, 1);
}

public float weightShardRemoved(Balancer balancer, ModelNode node, String index) {
return weight(balancer, node, index, -1);
}

private float weight(Balancer balancer, ModelNode node, String index, int numAdditionalShards) {
final float weightShard = node.numShards() + numAdditionalShards - balancer.avgShardsPerNode();
final float weightIndex = node.numShards(index) + numAdditionalShards - balancer.avgShardsPerNode(index);
float weight(Balancer balancer, ModelNode node, String index) {
final float weightShard = node.numShards() - balancer.avgShardsPerNode();
final float weightIndex = node.numShards(index) - balancer.avgShardsPerNode(index);
return theta0 * weightShard + theta1 * weightIndex;
}
}
Expand Down Expand Up @@ -377,9 +365,9 @@ private MoveDecision decideRebalance(final ShardRouting shard) {
assert currentNode != null : "currently assigned node could not be found";

// balance the shard, if a better node can be found
final float currentWeight = sorter.weight(currentNode);
final AllocationDeciders deciders = allocation.deciders();
final String idxName = shard.getIndexName();
final float currentWeight = weight.weight(this, currentNode, idxName);
final AllocationDeciders deciders = allocation.deciders();
Type rebalanceDecisionType = Type.NO;
ModelNode assignedNode = null;
List<Tuple<ModelNode, Decision>> betterBalanceNodes = new ArrayList<>();
Expand All @@ -394,7 +382,7 @@ private MoveDecision decideRebalance(final ShardRouting shard) {
// this is a comparison of the number of shards on this node to the number of shards
// that should be on each node on average (both taking the cluster as a whole into account
// as well as shards per index)
final float nodeWeight = sorter.weight(node);
final float nodeWeight = weight.weight(this, node, idxName);
// if the node we are examining has a worse (higher) weight than the node the shard is
// assigned to, then there is no way moving the shard to the node with the worse weight
// can make the balance of the cluster better, so we check for that here
Expand All @@ -408,12 +396,9 @@ private MoveDecision decideRebalance(final ShardRouting shard) {
// more even, it doesn't make sense to execute the heavyweight operation of relocating a shard unless
// the gains make it worth it, as defined by the threshold
boolean deltaAboveThreshold = lessThan(currentDelta, threshold) == false;
// simulate the weight of the node if we were to relocate the shard to it
float weightWithShardAdded = weight.weightShardAdded(this, node, idxName);
// calculate the delta of the weights of the two nodes if we were to add the shard to the
// node in question and move it away from the node that currently holds it.
float proposedDelta = weightWithShardAdded - weight.weightShardRemoved(this, currentNode, idxName);
boolean betterWeightWithShardAdded = proposedDelta < currentDelta;
boolean betterWeightWithShardAdded = nodeWeight + 1.0f < currentWeight;
rebalanceConditionsMet = deltaAboveThreshold && betterWeightWithShardAdded;
// if the simulated weight delta with the shard moved away is better than the weight delta
// with the shard remaining on the current node, and we are allowed to allocate to the
Expand Down Expand Up @@ -538,9 +523,18 @@ private void balanceByWeights() {
logger.trace("Balancing from node [{}] weight: [{}] to node [{}] weight: [{}] delta: [{}]",
maxNode.getNodeId(), weights[highIdx], minNode.getNodeId(), weights[lowIdx], delta);
}
/* pass the delta to the replication function to prevent relocations that only swap the weights of the two nodes.
* a relocation must bring us closer to the balance if we only achieve the same delta the relocation is useless */
if (tryRelocateShard(minNode, maxNode, index, delta)) {
if (delta <= 1.0f) {
/*
* prevent relocations that only swap the weights of the two nodes. a relocation must bring us closer to the
* balance if we only achieve the same delta the relocation is useless
*
* NB this comment above was preserved from an earlier version but doesn't obviously describe the code today. We
* already know that lessThan(delta, threshold) == false and threshold defaults to 1.0, so by default we never
* hit this case anyway.
*/
logger.trace("Couldn't find shard to relocate from node [{}] to node [{}]",
maxNode.getNodeId(), minNode.getNodeId());
} else if (tryRelocateShard(minNode, maxNode, index)) {
/*
* TODO we could be a bit smarter here, we don't need to fully sort necessarily
* we could just find the place to insert linearly but the win might be minor
Expand Down Expand Up @@ -666,12 +660,12 @@ public void moveShards() {
* to the {@link MoveDecision} return object:
* 1. If the shard is not started, no decision will be taken and {@link MoveDecision#isDecisionTaken()} will return false.
* 2. If the shard is allowed to remain on its current node, no attempt will be made to move the shard and
* {@link MoveDecision#canRemainDecision} will have a decision type of YES. All other fields in the object will be null.
* {@link MoveDecision#getCanRemainDecision} will have a decision type of YES. All other fields in the object will be null.
* 3. If the shard is not allowed to remain on its current node, then {@link MoveDecision#getAllocationDecision()} will be
* populated with the decision of moving to another node. If {@link MoveDecision#forceMove()} ()} returns {@code true}, then
* {@link MoveDecision#targetNode} will return a non-null value, otherwise the assignedNodeId will be null.
* {@link MoveDecision#getTargetNode} will return a non-null value, otherwise the assignedNodeId will be null.
* 4. If the method is invoked in explain mode (e.g. from the cluster allocation explain APIs), then
* {@link MoveDecision#nodeDecisions} will have a non-null value.
* {@link MoveDecision#getNodeDecisions} will have a non-null value.
*/
public MoveDecision decideMove(final ShardRouting shardRouting) {
if (shardRouting.started() == false) {
Expand Down Expand Up @@ -915,8 +909,8 @@ private AllocateUnassignedDecision decideAllocateUnassigned(final ShardRouting s
continue;
}

// simulate weight if we would add shard to node
float currentWeight = weight.weightShardAdded(this, node, shard.getIndexName());
// weight of this index currently on the node
float currentWeight = weight.weight(this, node, shard.getIndexName());
// moving the shard would not improve the balance, and we are not in explain mode, so short circuit
if (currentWeight > minWeight && explain == false) {
continue;
Expand Down Expand Up @@ -985,66 +979,54 @@ private AllocateUnassignedDecision decideAllocateUnassigned(final ShardRouting s
);
}

private static final Comparator<ShardRouting> BY_DESCENDING_SHARD_ID = Comparator.comparing(ShardRouting::shardId).reversed();

/**
* Tries to find a relocation from the max node to the minimal node for an arbitrary shard of the given index on the
* balance model. Iff this method returns a <code>true</code> the relocation has already been executed on the
* simulation model as well as on the cluster.
*/
private boolean tryRelocateShard(ModelNode minNode, ModelNode maxNode, String idx, float minCost) {
private boolean tryRelocateShard(ModelNode minNode, ModelNode maxNode, String idx) {
final ModelIndex index = maxNode.getIndex(idx);
Decision decision = null;
if (index != null) {
if (logger.isTraceEnabled()) {
logger.trace("Try relocating shard for index index [{}] from node [{}] to node [{}]", idx, maxNode.getNodeId(),
minNode.getNodeId());
}
ShardRouting candidate = null;
logger.trace("Try relocating shard of [{}] from [{}] to [{}]", idx, maxNode.getNodeId(), minNode.getNodeId());
final Iterable<ShardRouting> shardRoutings = StreamSupport.stream(index.spliterator(), false)
.filter(ShardRouting::started) // cannot rebalance unassigned, initializing or relocating shards anyway
.filter(maxNode::containsShard)
.sorted(BY_DESCENDING_SHARD_ID) // check in descending order of shard id so that the decision is deterministic
::iterator;

final AllocationDeciders deciders = allocation.deciders();
for (ShardRouting shard : index) {
if (shard.started()) {
// skip initializing, unassigned and relocating shards we can't relocate them anyway
Decision allocationDecision = deciders.canAllocate(shard, minNode.getRoutingNode(), allocation);
Decision rebalanceDecision = deciders.canRebalance(shard, allocation);
if (((allocationDecision.type() == Type.YES) || (allocationDecision.type() == Type.THROTTLE))
&& ((rebalanceDecision.type() == Type.YES) || (rebalanceDecision.type() == Type.THROTTLE))) {
if (maxNode.containsShard(shard)) {
// simulate moving shard from maxNode to minNode
final float delta = weight.weightShardAdded(
this, minNode, idx) - weight.weightShardRemoved(this, maxNode, idx);
if (delta < minCost ||
(candidate != null && Float.compare(delta, minCost) == 0 && candidate.id() > shard.id())) {
/* this last line is a tie-breaker to make the shard allocation alg deterministic
* otherwise we rely on the iteration order of the index.getAllShards() which is a set.*/
minCost = delta;
candidate = shard;
decision = new Decision.Multi().add(allocationDecision).add(rebalanceDecision);
}
}
}
for (ShardRouting shard : shardRoutings) {
final Decision rebalanceDecision = deciders.canRebalance(shard, allocation);
if (rebalanceDecision.type() == Type.NO) {
continue;
}
}
final Decision allocationDecision = deciders.canAllocate(shard, minNode.getRoutingNode(), allocation);
if (allocationDecision.type() == Type.NO) {
continue;
}

final Decision decision = new Decision.Multi().add(allocationDecision).add(rebalanceDecision);

if (candidate != null) {
/* allocate on the model even if not throttled */
maxNode.removeShard(candidate);
long shardSize = allocation.clusterInfo().getShardSize(candidate, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE);
maxNode.removeShard(shard);
long shardSize = allocation.clusterInfo().getShardSize(shard, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE);

if (decision.type() == Type.YES) { /* only allocate on the cluster if we are not throttled */
logger.debug("Relocate shard [{}] from node [{}] to node [{}]", candidate, maxNode.getNodeId(),
minNode.getNodeId());
/* now allocate on the cluster */
minNode.addShard(routingNodes.relocateShard(candidate, minNode.getNodeId(), shardSize, allocation.changes()).v1());
if (decision.type() == Type.YES) {
/* only allocate on the cluster if we are not throttled */
logger.debug("Relocate [{}] from [{}] to [{}]", shard, maxNode.getNodeId(), minNode.getNodeId());
minNode.addShard(routingNodes.relocateShard(shard, minNode.getNodeId(), shardSize, allocation.changes()).v1());
return true;
} else {
/* allocate on the model even if throttled */
logger.debug("Simulate relocation of [{}] from [{}] to [{}]", shard, maxNode.getNodeId(), minNode.getNodeId());
assert decision.type() == Type.THROTTLE;
minNode.addShard(candidate.relocate(minNode.getNodeId(), shardSize));
minNode.addShard(shard.relocate(minNode.getNodeId(), shardSize));
return false;
}
}
}
if (logger.isTraceEnabled()) {
logger.trace("Couldn't find shard to relocate from node [{}] to node [{}] allocation decision [{}]",
maxNode.getNodeId(), minNode.getNodeId(), decision == null ? "NO" : decision.type().name());
}
logger.trace("No shards of [{}] can relocate from [{}] to [{}]", idx, maxNode.getNodeId(), minNode.getNodeId());
return false;
}

Expand Down

0 comments on commit 635fe34

Please sign in to comment.