diff --git a/x-pack/plugin/shutdown/src/internalClusterTest/java/org/elasticsearch/xpack/shutdown/NodeShutdownShardsIT.java b/x-pack/plugin/shutdown/src/internalClusterTest/java/org/elasticsearch/xpack/shutdown/NodeShutdownShardsIT.java index 98564312e6bc0..77a6ecf5317ac 100644 --- a/x-pack/plugin/shutdown/src/internalClusterTest/java/org/elasticsearch/xpack/shutdown/NodeShutdownShardsIT.java +++ b/x-pack/plugin/shutdown/src/internalClusterTest/java/org/elasticsearch/xpack/shutdown/NodeShutdownShardsIT.java @@ -7,12 +7,19 @@ package org.elasticsearch.xpack.shutdown; +import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.Build; import org.elasticsearch.action.admin.cluster.node.info.NodeInfo; import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse; +import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.ShardRoutingState; +import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; @@ -20,6 +27,7 @@ import java.util.Arrays; import java.util.Collection; +import java.util.List; import static org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata.Status.COMPLETE; import static org.hamcrest.Matchers.equalTo; @@ -134,6 +142,71 @@ public void testShardStatusIsCompleteOnNonDataNodes() throws Exception { assertThat(getResp.getShutdownStatuses().get(0).migrationStatus().getStatus(), equalTo(COMPLETE)); } + /** + * Checks that, if we get to a situation where a shard can't move because all other nodes already have a copy of that shard, + * we'll still return COMPLETE instead of STALLED. + */ + public void testNotStalledIfAllShardsHaveACopyOnAnotherNode() throws Exception { + internalCluster().startNodes(2); + + final String indexName = "test"; + prepareCreate(indexName).setSettings( + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) // <- Ensure we have a copy of the shard on both nodes + .put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), 0) // Disable "normal" delayed allocation + ).get(); + ensureGreen(indexName); + indexRandomData(); + + String nodeToStopId = findIdOfNodeWithPrimaryShard(indexName); + PutShutdownNodeAction.Request putShutdownRequest = new PutShutdownNodeAction.Request( + nodeToStopId, + SingleNodeShutdownMetadata.Type.REMOVE, + this.getTestName(), + null + ); + AcknowledgedResponse putShutdownResponse = client().execute(PutShutdownNodeAction.INSTANCE, putShutdownRequest).get(); + assertTrue(putShutdownResponse.isAcknowledged()); + + assertBusy(() -> { + GetShutdownStatusAction.Response getResp = client().execute( + GetShutdownStatusAction.INSTANCE, + new GetShutdownStatusAction.Request(nodeToStopId) + ).get(); + + assertThat(getResp.getShutdownStatuses().get(0).migrationStatus().getStatus(), equalTo(COMPLETE)); + }); + } + + private void indexRandomData() throws Exception { + int numDocs = scaledRandomIntBetween(100, 1000); + IndexRequestBuilder[] builders = new IndexRequestBuilder[numDocs]; + for (int i = 0; i < builders.length; i++) { + builders[i] = client().prepareIndex("test", "_doc").setSource("field", "value"); + } + indexRandom(true, builders); + } + + private String findIdOfNodeWithPrimaryShard(String indexName) { + ClusterState state = client().admin().cluster().prepareState().get().getState(); + List startedShards = state.routingTable().shardsWithState(ShardRoutingState.STARTED); + return startedShards.stream() + .filter(ShardRouting::primary) + .filter(shardRouting -> indexName.equals(shardRouting.index().getName())) + .map(ShardRouting::currentNodeId) + .findFirst() + .orElseThrow( + () -> new AssertionError( + new ParameterizedMessage( + "could not find a primary shard of index [{}] in list of started shards [{}]", + indexName, + startedShards + ) + ) + ); + } + private String getNodeId(String nodeName) throws Exception { NodesInfoResponse nodes = client().admin().cluster().prepareNodesInfo().clear().get(); return nodes.getNodes() diff --git a/x-pack/plugin/shutdown/src/main/java/org/elasticsearch/xpack/shutdown/TransportGetShutdownStatusAction.java b/x-pack/plugin/shutdown/src/main/java/org/elasticsearch/xpack/shutdown/TransportGetShutdownStatusAction.java index b8a04207d7188..49fa31590e04d 100644 --- a/x-pack/plugin/shutdown/src/main/java/org/elasticsearch/xpack/shutdown/TransportGetShutdownStatusAction.java +++ b/x-pack/plugin/shutdown/src/main/java/org/elasticsearch/xpack/shutdown/TransportGetShutdownStatusAction.java @@ -28,6 +28,7 @@ import org.elasticsearch.cluster.routing.allocation.AllocationDecision; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; +import org.elasticsearch.cluster.routing.allocation.ShardAllocationDecision; import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Strings; @@ -44,6 +45,8 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; public class TransportGetShutdownStatusAction extends TransportMasterNodeAction< @@ -220,8 +223,13 @@ static ShutdownShardMigrationStatus shardMigrationStatus( ); allocation.setDebugMode(RoutingAllocation.DebugMode.EXCLUDE_YES_DECISIONS); + // We also need the set of node IDs which are currently shutting down. + Set shuttingDownNodes = currentState.metadata().nodeShutdowns().keySet(); + + AtomicInteger shardsToIgnoreForFinalStatus = new AtomicInteger(0); + // Explain shard allocations until we find one that can't move, then stop (as `findFirst` short-circuits) - final Optional unmovableShard = currentState.getRoutingNodes() + Optional> unmovableShard = currentState.getRoutingNodes() .node(nodeId) .shardsWithState(ShardRoutingState.STARTED) .stream() @@ -236,6 +244,21 @@ static ShutdownShardMigrationStatus shardMigrationStatus( .filter(pair -> pair.v2().getMoveDecision().getAllocationDecision().equals(AllocationDecision.THROTTLED) == false) // These shards will move as soon as possible .filter(pair -> pair.v2().getMoveDecision().getAllocationDecision().equals(AllocationDecision.YES) == false) + // If the shard that can't move is on every node in the cluster, we shouldn't be `STALLED` on it. + .filter(pair -> { + final boolean hasShardCopyOnOtherNode = currentState.routingTable() + .allShards(pair.v1().index().getName()) + .stream() + .filter(shardRouting -> shardRouting.id() == pair.v1().id()) + // If any shards are both 1) `STARTED` and 2) are not on a node that's shutting down, we have at least one copy + // of this shard safely on a node that's not shutting down, so we don't want to report `STALLED` because of this shard. + .filter(ShardRouting::started) + .anyMatch(routing -> shuttingDownNodes.contains(routing.currentNodeId()) == false); + if (hasShardCopyOnOtherNode) { + shardsToIgnoreForFinalStatus.incrementAndGet(); + } + return hasShardCopyOnOtherNode == false; + }) .peek(pair -> { if (logger.isTraceEnabled()) { // don't serialize the decision unless we have to logger.trace( @@ -249,12 +272,19 @@ static ShutdownShardMigrationStatus shardMigrationStatus( ); } }) - .map(Tuple::v1) .findFirst(); - if (unmovableShard.isPresent()) { + if (totalRemainingShards == shardsToIgnoreForFinalStatus.get() && unmovableShard.isPresent() == false) { + return new ShutdownShardMigrationStatus( + SingleNodeShutdownMetadata.Status.COMPLETE, + 0, + "[" + + shardsToIgnoreForFinalStatus.get() + + "] shards cannot be moved away from this node but have at least one copy on another node in the cluster" + ); + } else if (unmovableShard.isPresent()) { // We found a shard that can't be moved, so shard relocation is stalled. Blame the unmovable shard. - ShardRouting shardRouting = unmovableShard.get(); + ShardRouting shardRouting = unmovableShard.get().v1(); return new ShutdownShardMigrationStatus( SingleNodeShutdownMetadata.Status.STALLED, @@ -267,7 +297,6 @@ static ShutdownShardMigrationStatus shardMigrationStatus( ).getFormattedMessage() ); } else { - // We couldn't find any shards that can't be moved, so we're just waiting for other recoveries or initializing shards return new ShutdownShardMigrationStatus(SingleNodeShutdownMetadata.Status.IN_PROGRESS, totalRemainingShards); } } diff --git a/x-pack/plugin/shutdown/src/test/java/org/elasticsearch/xpack/shutdown/TransportGetShutdownStatusActionTests.java b/x-pack/plugin/shutdown/src/test/java/org/elasticsearch/xpack/shutdown/TransportGetShutdownStatusActionTests.java index cd4435db25c8b..085c462f84dc3 100644 --- a/x-pack/plugin/shutdown/src/test/java/org/elasticsearch/xpack/shutdown/TransportGetShutdownStatusActionTests.java +++ b/x-pack/plugin/shutdown/src/test/java/org/elasticsearch/xpack/shutdown/TransportGetShutdownStatusActionTests.java @@ -365,6 +365,46 @@ public void testStalled() { ); } + public void testNotStalledIfAllShardsHaveACopyOnAnotherNode() { + Index index = new Index(randomAlphaOfLength(5), randomAlphaOfLengthBetween(1, 20)); + IndexMetadata imd = generateIndexMetadata(index, 3, 0); + IndexRoutingTable indexRoutingTable = IndexRoutingTable.builder(index) + .addShard(TestShardRouting.newShardRouting(new ShardId(index, 0), LIVE_NODE_ID, false, ShardRoutingState.STARTED)) + .addShard(TestShardRouting.newShardRouting(new ShardId(index, 0), SHUTTING_DOWN_NODE_ID, true, ShardRoutingState.STARTED)) + .build(); + + // Force a decision of NO for all moves and new allocations, simulating a decider that's stuck + canAllocate.set((r, n, a) -> Decision.NO); + // And the remain decider simulates NodeShutdownAllocationDecider + canRemain.set((r, n, a) -> n.nodeId().equals(SHUTTING_DOWN_NODE_ID) ? Decision.NO : Decision.YES); + + RoutingTable.Builder routingTable = RoutingTable.builder(); + routingTable.add(indexRoutingTable); + ClusterState state = createTestClusterState( + routingTable.build(), + org.elasticsearch.core.List.of(imd), + SingleNodeShutdownMetadata.Type.REMOVE + ); + + ShutdownShardMigrationStatus status = TransportGetShutdownStatusAction.shardMigrationStatus( + state, + SHUTTING_DOWN_NODE_ID, + SingleNodeShutdownMetadata.Type.REMOVE, + true, + clusterInfoService, + snapshotsInfoService, + allocationService, + allocationDeciders + ); + + assertShardMigration( + status, + SingleNodeShutdownMetadata.Status.COMPLETE, + 0, + containsString("[1] shards cannot be moved away from this node but have at least one copy on another node in the cluster") + ); + } + public void testOnlyInitializingShardsRemaining() { Index index = new Index(randomAlphaOfLength(5), randomAlphaOfLengthBetween(1, 20)); IndexMetadata imd = generateIndexMetadata(index, 3, 0);