Skip to content

Commit

Permalink
Check allocation id when failing shard on recovery (#50656)
Browse files Browse the repository at this point in the history
A failure of a recovering shard can race with a new allocation of the shard, and cause the new
allocation to be failed as well. This can result in a shard being marked as initializing in the cluster
state, but not exist on the node anymore.

Closes #50508
  • Loading branch information
ywelsch committed Jan 7, 2020
1 parent e881dc2 commit f694266
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -717,7 +717,8 @@ public void onRecoveryFailure(RecoveryState state, RecoveryFailedException e, bo
}
}

private synchronized void handleRecoveryFailure(ShardRouting shardRouting, boolean sendShardFailure, Exception failure) {
// package-private for testing
synchronized void handleRecoveryFailure(ShardRouting shardRouting, boolean sendShardFailure, Exception failure) {
failAndRemoveShard(shardRouting, sendShardFailure, "failed recovery", failure, clusterService.state());
}

Expand All @@ -726,7 +727,10 @@ private void failAndRemoveShard(ShardRouting shardRouting, boolean sendShardFail
try {
AllocatedIndex<? extends Shard> indexService = indicesService.indexService(shardRouting.shardId().getIndex());
if (indexService != null) {
indexService.removeShard(shardRouting.shardId().id(), message);
Shard shard = indexService.getShardOrNull(shardRouting.shardId().id());
if (shard != null && shard.routingEntry().isSameAllocation(shardRouting)) {
indexService.removeShard(shardRouting.shardId().id(), message);
}
}
} catch (ShardNotFoundException e) {
// the node got closed on us, ignore it
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,40 @@ public void testInitializingPrimaryRemovesInitializingReplicaWithSameAID() {

}

public void testRecoveryFailures() {
disableRandomFailures();
String index = "index_" + randomAlphaOfLength(8).toLowerCase(Locale.ROOT);
ClusterState state = ClusterStateCreationUtils.state(index, randomBoolean(),
ShardRoutingState.STARTED, ShardRoutingState.INITIALIZING);

// the initial state which is derived from the newly created cluster state but doesn't contain the index
ClusterState previousState = ClusterState.builder(state)
.metaData(MetaData.builder(state.metaData()).remove(index))
.routingTable(RoutingTable.builder().build())
.build();

// pick a data node to simulate the adding an index cluster state change event on, that has shards assigned to it
final ShardRouting shardRouting = state.routingTable().index(index).shard(0).replicaShards().get(0);
final ShardId shardId = shardRouting.shardId();
DiscoveryNode node = state.nodes().get(shardRouting.currentNodeId());

// simulate the cluster state change on the node
ClusterState localState = adaptClusterStateToLocalNode(state, node);
ClusterState previousLocalState = adaptClusterStateToLocalNode(previousState, node);
IndicesClusterStateService indicesCSSvc = createIndicesClusterStateService(node, RecordingIndicesService::new);
indicesCSSvc.start();
indicesCSSvc.applyClusterState(new ClusterChangedEvent("cluster state change that adds the index", localState, previousLocalState));

assertNotNull(indicesCSSvc.indicesService.getShardOrNull(shardId));

// check that failing unrelated allocation does not remove shard
indicesCSSvc.handleRecoveryFailure(shardRouting.reinitializeReplicaShard(), false, new Exception("dummy"));
assertNotNull(indicesCSSvc.indicesService.getShardOrNull(shardId));

indicesCSSvc.handleRecoveryFailure(shardRouting, false, new Exception("dummy"));
assertNull(indicesCSSvc.indicesService.getShardOrNull(shardId));
}

public ClusterState randomInitialClusterState(Map<DiscoveryNode, IndicesClusterStateService> clusterStateServiceMap,
Supplier<MockIndicesService> indicesServiceSupplier) {
List<DiscoveryNode> allNodes = new ArrayList<>();
Expand Down

0 comments on commit f694266

Please sign in to comment.