Skip to content

Commit

Permalink
Add NodeVersionAllocationDecider check
Browse files Browse the repository at this point in the history
Signed-off-by: Poojita Raj <poojiraj@amazon.com>
  • Loading branch information
Poojita-Raj committed Aug 4, 2023
1 parent abc517a commit 9d22b92
Show file tree
Hide file tree
Showing 3 changed files with 181 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ public static Collection<AllocationDecider> createAllocationDeciders(
addAllocationDecider(deciders, new ConcurrentRebalanceAllocationDecider(settings, clusterSettings));
addAllocationDecider(deciders, new ConcurrentRecoveriesAllocationDecider(settings, clusterSettings));
addAllocationDecider(deciders, new EnableAllocationDecider(settings, clusterSettings));
addAllocationDecider(deciders, new NodeVersionAllocationDecider());
addAllocationDecider(deciders, new NodeVersionAllocationDecider(settings));
addAllocationDecider(deciders, new SnapshotInProgressAllocationDecider());
addAllocationDecider(deciders, new RestoreInProgressAllocationDecider());
addAllocationDecider(deciders, new FilterAllocationDecider(settings, clusterSettings));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,18 @@

package org.opensearch.cluster.routing.allocation.decider;

import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.routing.RecoverySource;
import org.opensearch.cluster.routing.RecoverySource.SnapshotRecoverySource;
import org.opensearch.cluster.routing.RoutingNode;
import org.opensearch.cluster.routing.RoutingNodes;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.allocation.RoutingAllocation;
import org.opensearch.common.settings.Settings;
import org.opensearch.indices.replication.common.ReplicationType;

import java.util.List;
import java.util.stream.Collectors;

/**
* An allocation decider that prevents relocation or allocation from nodes
Expand All @@ -52,9 +58,35 @@ public class NodeVersionAllocationDecider extends AllocationDecider {

public static final String NAME = "node_version";

private final ReplicationType replicationType;

public NodeVersionAllocationDecider(Settings settings) {
replicationType = IndexMetadata.INDEX_REPLICATION_TYPE_SETTING.get(settings);
}

@Override
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
if (shardRouting.primary()) {
if (replicationType == ReplicationType.SEGMENT) {
List<ShardRouting> replicas = allocation.routingNodes()
.assignedShards(shardRouting.shardId())
.stream()
.filter(shr -> !shr.primary() && shr.active())
.collect(Collectors.toList());
for (ShardRouting replica : replicas) {
// can not allocate if target node version > any existing replica version
RoutingNode replicaNode = allocation.routingNodes().node(replica.currentNodeId());
if (node.node().getVersion().after(replicaNode.node().getVersion())) {
return allocation.decision(
Decision.NO,
NAME,
"When segment replication is enabled, cannot relocate primary shard to a node with version [%s] if it has a replica on older version [%s]",
node.node().getVersion(),
replicaNode.node().getVersion()
);
}
}
}
if (shardRouting.currentNodeId() == null) {
if (shardRouting.recoverySource() != null && shardRouting.recoverySource().getType() == RecoverySource.Type.SNAPSHOT) {
// restoring from a snapshot - check that the node can handle the version
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import org.opensearch.common.util.set.Sets;
import org.opensearch.core.index.Index;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.repositories.IndexId;
import org.opensearch.snapshots.EmptySnapshotsInfoService;
import org.opensearch.snapshots.InternalSnapshotsInfoService;
Expand Down Expand Up @@ -439,7 +440,9 @@ public void testRebalanceDoesNotAllocatePrimaryAndReplicasOnDifferentVersionNode
.routingTable(routingTable)
.nodes(DiscoveryNodes.builder().add(newNode).add(oldNode1).add(oldNode2))
.build();
AllocationDeciders allocationDeciders = new AllocationDeciders(Collections.singleton(new NodeVersionAllocationDecider()));
AllocationDeciders allocationDeciders = new AllocationDeciders(
Collections.singleton(new NodeVersionAllocationDecider(Settings.EMPTY))
);
AllocationService strategy = new MockAllocationService(
allocationDeciders,
new TestGatewayAllocator(),
Expand Down Expand Up @@ -509,7 +512,7 @@ public void testRestoreDoesNotAllocateSnapshotOnOlderNodes() {
.nodes(DiscoveryNodes.builder().add(newNode).add(oldNode1).add(oldNode2))
.build();
AllocationDeciders allocationDeciders = new AllocationDeciders(
Arrays.asList(new ReplicaAfterPrimaryActiveAllocationDecider(), new NodeVersionAllocationDecider())
Arrays.asList(new ReplicaAfterPrimaryActiveAllocationDecider(), new NodeVersionAllocationDecider(Settings.EMPTY))
);
AllocationService strategy = new MockAllocationService(
allocationDeciders,
Expand All @@ -526,6 +529,148 @@ public void testRestoreDoesNotAllocateSnapshotOnOlderNodes() {
}
}

public void testRebalanceDoesNotAllocatePrimaryOnHigherVersionNodesSegrepEnabled() {
ShardId shard1 = new ShardId("test1", "_na_", 0);
ShardId shard2 = new ShardId("test2", "_na_", 0);
final DiscoveryNode newNode1 = new DiscoveryNode(
"newNode1",
buildNewFakeTransportAddress(),
emptyMap(),
CLUSTER_MANAGER_DATA_ROLES,
Version.CURRENT
);
final DiscoveryNode newNode2 = new DiscoveryNode(
"newNode2",
buildNewFakeTransportAddress(),
emptyMap(),
CLUSTER_MANAGER_DATA_ROLES,
Version.CURRENT
);
final DiscoveryNode oldNode1 = new DiscoveryNode(
"oldNode1",
buildNewFakeTransportAddress(),
emptyMap(),
CLUSTER_MANAGER_DATA_ROLES,
VersionUtils.getPreviousVersion()
);
final DiscoveryNode oldNode2 = new DiscoveryNode(
"oldNode2",
buildNewFakeTransportAddress(),
emptyMap(),
CLUSTER_MANAGER_DATA_ROLES,
VersionUtils.getPreviousVersion()
);
AllocationId allocationId1P = AllocationId.newInitializing();
AllocationId allocationId1R = AllocationId.newInitializing();
AllocationId allocationId2P = AllocationId.newInitializing();
AllocationId allocationId2R = AllocationId.newInitializing();

Settings segmentReplicationSettings = Settings.builder()
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.build();
Metadata metadata = Metadata.builder()
.put(
IndexMetadata.builder(shard1.getIndexName())
.settings(settings(Version.CURRENT).put(segmentReplicationSettings))
.numberOfShards(1)
.numberOfReplicas(1)
.putInSyncAllocationIds(0, Sets.newHashSet(allocationId1P.getId(), allocationId1R.getId()))
)
.put(
IndexMetadata.builder(shard2.getIndexName())
.settings(settings(Version.CURRENT).put(segmentReplicationSettings))
.numberOfShards(1)
.numberOfReplicas(1)
.putInSyncAllocationIds(0, Sets.newHashSet(allocationId2P.getId(), allocationId2R.getId()))
)
.build();
RoutingTable routingTable = RoutingTable.builder()
.add(
IndexRoutingTable.builder(shard1.getIndex())
.addIndexShard(
new IndexShardRoutingTable.Builder(shard1).addShard(
TestShardRouting.newShardRouting(
shard1.getIndexName(),
shard1.getId(),
oldNode1.getId(),
null,
true,
ShardRoutingState.STARTED,
allocationId1P
)
)
.addShard(
TestShardRouting.newShardRouting(
shard1.getIndexName(),
shard1.getId(),
oldNode2.getId(),
null,
false,
ShardRoutingState.STARTED,
allocationId1R
)
)
.build()
)
)
.add(
IndexRoutingTable.builder(shard2.getIndex())
.addIndexShard(
new IndexShardRoutingTable.Builder(shard2).addShard(
TestShardRouting.newShardRouting(
shard2.getIndexName(),
shard2.getId(),
oldNode2.getId(),
null,
true,
ShardRoutingState.STARTED,
allocationId2P
)
)
.addShard(
TestShardRouting.newShardRouting(
shard2.getIndexName(),
shard2.getId(),
oldNode1.getId(),
null,
false,
ShardRoutingState.STARTED,
allocationId2R
)
)
.build()
)
)
.build();
ClusterState state = ClusterState.builder(org.opensearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
.metadata(metadata)
.routingTable(routingTable)
.nodes(DiscoveryNodes.builder().add(newNode1).add(newNode2).add(oldNode1).add(oldNode2))
.build();
AllocationDeciders allocationDeciders = new AllocationDeciders(
Collections.singleton(new NodeVersionAllocationDecider(segmentReplicationSettings))
);
AllocationService strategy = new MockAllocationService(
allocationDeciders,
new TestGatewayAllocator(),
new BalancedShardsAllocator(Settings.EMPTY),
EmptyClusterInfoService.INSTANCE,
EmptySnapshotsInfoService.INSTANCE
);
state = strategy.reroute(state, new AllocationCommands(), true, false).getClusterState();
// the two indices must stay as is, the replicas cannot move to oldNode2 because versions don't match
assertThat(state.routingTable().index(shard2.getIndex()).shardsWithState(ShardRoutingState.RELOCATING).size(), equalTo(1));
assertThat(
state.routingTable().index(shard2.getIndex()).shardsWithState(ShardRoutingState.RELOCATING).get(0).primary(),
equalTo(false)
);
assertThat(state.routingTable().index(shard1.getIndex()).shardsWithState(ShardRoutingState.RELOCATING).size(), equalTo(1));
assertThat(
state.routingTable().index(shard1.getIndex()).shardsWithState(ShardRoutingState.RELOCATING).get(0).primary(),
equalTo(false)
);
}

private ClusterState stabilize(ClusterState clusterState, AllocationService service) {
logger.trace("RoutingNodes: {}", clusterState.getRoutingNodes());

Expand Down Expand Up @@ -626,7 +771,7 @@ public void testMessages() {
RoutingAllocation routingAllocation = new RoutingAllocation(null, clusterState.getRoutingNodes(), clusterState, null, null, 0);
routingAllocation.debugDecision(true);

final NodeVersionAllocationDecider allocationDecider = new NodeVersionAllocationDecider();
final NodeVersionAllocationDecider allocationDecider = new NodeVersionAllocationDecider(Settings.EMPTY);
Decision decision = allocationDecider.canAllocate(primaryShard, newNode, routingAllocation);
assertThat(decision.type(), is(Decision.Type.YES));
assertThat(decision.getExplanation(), is("the primary shard is new or already existed on the node"));
Expand Down

0 comments on commit 9d22b92

Please sign in to comment.