Skip to content

Commit

Permalink
Cannot force allocate primary to a node where the shard already exists (
Browse files Browse the repository at this point in the history
#22031)

Before, it was possible that the SameShardAllocationDecider would allow
force allocation of an unassigned primary to the same node on which an
active replica is assigned.  This could only happen with shadow replica
indices, because when a shadow replica primary fails, the replica gets
promoted to primary but in the INITIALIZED state, not in the STARTED
state (because the engine has specific reinitialization that must take
place in the case of shadow replicas).  Therefore, if the now promoted
primary that is initializing fails also, the primary will be in the
unassigned state, because replica to primary promotion only happens when
the failed shard was in the started state.  The now unassigned primary
shard will go through the allocation deciders, where the
SameShardsAllocationDecider would return a NO decision, but would still
permit force allocation on the primary if all deciders returned NO.

This commit implements canForceAllocatePrimary on the
SameShardAllocationDecider, which ensures that a primary cannot be
force allocated to the same node on which an active replica already
exists.
  • Loading branch information
Ali Beyad authored Dec 8, 2016
1 parent 8fe4bc1 commit 3da0429
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,54 @@ public SameShardAllocationDecider(Settings settings) {
@Override
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
Iterable<ShardRouting> assignedShards = allocation.routingNodes().assignedShards(shardRouting.shardId());
Decision decision = decideSameNode(shardRouting, node, allocation, assignedShards);
if (decision.type() == Decision.Type.NO || sameHost == false) {
// if its already a NO decision looking at the node, or we aren't configured to look at the host, return the decision
return decision;
}
if (node.node() != null) {
for (RoutingNode checkNode : allocation.routingNodes()) {
if (checkNode.node() == null) {
continue;
}
// check if its on the same host as the one we want to allocate to
boolean checkNodeOnSameHostName = false;
boolean checkNodeOnSameHostAddress = false;
if (Strings.hasLength(checkNode.node().getHostAddress()) && Strings.hasLength(node.node().getHostAddress())) {
if (checkNode.node().getHostAddress().equals(node.node().getHostAddress())) {
checkNodeOnSameHostAddress = true;
}
} else if (Strings.hasLength(checkNode.node().getHostName()) && Strings.hasLength(node.node().getHostName())) {
if (checkNode.node().getHostName().equals(node.node().getHostName())) {
checkNodeOnSameHostName = true;
}
}
if (checkNodeOnSameHostAddress || checkNodeOnSameHostName) {
for (ShardRouting assignedShard : assignedShards) {
if (checkNode.nodeId().equals(assignedShard.currentNodeId())) {
String hostType = checkNodeOnSameHostAddress ? "address" : "name";
String host = checkNodeOnSameHostAddress ? node.node().getHostAddress() : node.node().getHostName();
return allocation.decision(Decision.NO, NAME,
"the shard cannot be allocated on host %s [%s], where it already exists on node [%s]; " +
"set [%s] to false to allow multiple nodes on the same host to hold the same shard copies",
hostType, host, node.nodeId(), CLUSTER_ROUTING_ALLOCATION_SAME_HOST_SETTING.getKey());
}
}
}
}
}
return allocation.decision(Decision.YES, NAME, "the shard does not exist on the same host");
}

@Override
public Decision canForceAllocatePrimary(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
assert shardRouting.primary() : "must not call force allocate on a non-primary shard";
Iterable<ShardRouting> assignedShards = allocation.routingNodes().assignedShards(shardRouting.shardId());
return decideSameNode(shardRouting, node, allocation, assignedShards);
}

private Decision decideSameNode(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation,
Iterable<ShardRouting> assignedShards) {
for (ShardRouting assignedShard : assignedShards) {
if (node.nodeId().equals(assignedShard.currentNodeId())) {
if (assignedShard.isSameAllocation(shardRouting)) {
Expand All @@ -72,39 +120,6 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
}
}
}
if (sameHost) {
if (node.node() != null) {
for (RoutingNode checkNode : allocation.routingNodes()) {
if (checkNode.node() == null) {
continue;
}
// check if its on the same host as the one we want to allocate to
boolean checkNodeOnSameHostName = false;
boolean checkNodeOnSameHostAddress = false;
if (Strings.hasLength(checkNode.node().getHostAddress()) && Strings.hasLength(node.node().getHostAddress())) {
if (checkNode.node().getHostAddress().equals(node.node().getHostAddress())) {
checkNodeOnSameHostAddress = true;
}
} else if (Strings.hasLength(checkNode.node().getHostName()) && Strings.hasLength(node.node().getHostName())) {
if (checkNode.node().getHostName().equals(node.node().getHostName())) {
checkNodeOnSameHostName = true;
}
}
if (checkNodeOnSameHostAddress || checkNodeOnSameHostName) {
for (ShardRouting assignedShard : assignedShards) {
if (checkNode.nodeId().equals(assignedShard.currentNodeId())) {
String hostType = checkNodeOnSameHostAddress ? "address" : "name";
String host = checkNodeOnSameHostAddress ? node.node().getHostAddress() : node.node().getHostName();
return allocation.decision(Decision.NO, NAME,
"the shard cannot be allocated on host %s [%s], where it already exists on node [%s]; " +
"set [%s] to false to allow multiple nodes on the same host to hold the same shard copies",
hostType, host, node.nodeId(), CLUSTER_ROUTING_ALLOCATION_SAME_HOST_SETTING.getKey());
}
}
}
}
}
}
return allocation.decision(Decision.YES, NAME, "the shard does not exist on the same " + (sameHost ? "host" : "node"));
return allocation.decision(Decision.YES, NAME, "the shard does not exist on the same node");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,29 @@

import org.apache.logging.log4j.Logger;
import org.elasticsearch.Version;
import org.elasticsearch.action.support.replication.ClusterStateCreationUtils;
import org.elasticsearch.cluster.ClusterInfo;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ESAllocationTestCase;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.cluster.routing.allocation.decider.SameShardAllocationDecider;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;

import java.util.Collections;

import static java.util.Collections.emptyMap;
import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
Expand Down Expand Up @@ -86,4 +96,31 @@ public void testSameHost() {
assertThat(shardRouting.currentNodeId(), equalTo("node3"));
}
}

public void testForceAllocatePrimaryOnSameNodeNotAllowed() {
SameShardAllocationDecider decider = new SameShardAllocationDecider(Settings.EMPTY);
ClusterState clusterState = ClusterStateCreationUtils.state("idx", randomIntBetween(2, 4), 1);
Index index = clusterState.getMetaData().index("idx").getIndex();
ShardRouting primaryShard = clusterState.routingTable().index(index).shard(0).primaryShard();
RoutingNode routingNode = clusterState.getRoutingNodes().node(primaryShard.currentNodeId());
RoutingAllocation routingAllocation = new RoutingAllocation(new AllocationDeciders(Settings.EMPTY, Collections.emptyList()),
new RoutingNodes(clusterState, false), clusterState, ClusterInfo.EMPTY, System.nanoTime(), false
);

// can't force allocate same shard copy to the same node
ShardRouting newPrimary = TestShardRouting.newShardRouting(primaryShard.shardId(), null, true, ShardRoutingState.UNASSIGNED);
Decision decision = decider.canForceAllocatePrimary(newPrimary, routingNode, routingAllocation);
assertEquals(Decision.Type.NO, decision.type());

// can force allocate to a different node
RoutingNode unassignedNode = null;
for (RoutingNode node : clusterState.getRoutingNodes()) {
if (node.isEmpty()) {
unassignedNode = node;
break;
}
}
decision = decider.canForceAllocatePrimary(newPrimary, unassignedNode, routingAllocation);
assertEquals(Decision.Type.YES, decision.type());
}
}

0 comments on commit 3da0429

Please sign in to comment.