Skip to content

Commit

Permalink
Gather unassigned replicas corresponding to newly-created primary uni…
Browse files Browse the repository at this point in the history
…quely (elastic#107794)

Related to the work in elastic#101638 this changes the way we calculate whether
all replicas are unassigned when corresponding to newly created
primaries. While this doesn't affect anything in Stateful ES on its own,
it's a building-block used for object-store-based ES (Serverless).

Semi-related to elastic#99951, though it does not solve (and does not strive to
solve) that issue.
  • Loading branch information
dakrone authored Apr 24, 2024
1 parent ee566e4 commit 0719c90
Show file tree
Hide file tree
Showing 2 changed files with 208 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.elasticsearch.cluster.ClusterInfo;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.health.ClusterShardHealth;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.NodesShutdownMetadata;
Expand Down Expand Up @@ -440,7 +441,8 @@ public class ShardAllocationCounts {
public void increment(ShardRouting routing, ClusterState state, NodesShutdownMetadata shutdowns, boolean verbose) {
boolean isNew = isUnassignedDueToNewInitialization(routing, state);
boolean isRestarting = isUnassignedDueToTimelyRestart(routing, shutdowns);
boolean allUnavailable = areAllShardsOfThisTypeUnavailable(routing, state);
boolean allUnavailable = areAllShardsOfThisTypeUnavailable(routing, state)
&& isNewlyCreatedAndInitializingReplica(routing, state) == false;
if (allUnavailable) {
indicesWithAllShardsUnavailable.add(routing.getIndexName());
}
Expand Down Expand Up @@ -498,7 +500,7 @@ private void addDefinition(Diagnosis.Definition diagnosisDefinition, String inde
* example: if a replica is passed then this will return true if ALL replicas are unassigned,
* but if at least one is assigned, it will return false.
*/
private boolean areAllShardsOfThisTypeUnavailable(ShardRouting routing, ClusterState state) {
boolean areAllShardsOfThisTypeUnavailable(ShardRouting routing, ClusterState state) {
return StreamSupport.stream(
state.routingTable().allActiveShardsGrouped(new String[] { routing.getIndexName() }, true).spliterator(),
false
Expand All @@ -509,6 +511,29 @@ private boolean areAllShardsOfThisTypeUnavailable(ShardRouting routing, ClusterS
.allMatch(ShardRouting::unassigned);
}

/**
* Returns true if the given shard is a replica that is only unassigned due to its primary being
* newly created. See {@link ClusterShardHealth#getInactivePrimaryHealth(ShardRouting)} for more
* information.
*
* We use this information when considering whether a cluster should turn red. For some cases
* (a newly created index having unassigned replicas for example), we don't want the cluster
* to turn "unhealthy" for the tiny amount of time before the shards are allocated.
*/
static boolean isNewlyCreatedAndInitializingReplica(ShardRouting routing, ClusterState state) {
if (routing.active()) {
return false;
}
if (routing.primary()) {
return false;
}
ShardRouting primary = state.routingTable().shardRoutingTable(routing.shardId()).primaryShard();
if (primary.active()) {
return false;
}
return ClusterShardHealth.getInactivePrimaryHealth(primary) == ClusterHealthStatus.YELLOW;
}

private static boolean isUnassignedDueToTimelyRestart(ShardRouting routing, NodesShutdownMetadata shutdowns) {
var info = routing.unassignedInfo();
if (info == null || info.getReason() != UnassignedInfo.Reason.NODE_RESTARTING) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.routing.allocation.AllocateUnassignedDecision;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
Expand Down Expand Up @@ -407,6 +408,95 @@ public void testAllReplicasUnassigned() {
);
assertTrue(status.replicas.doAnyIndicesHaveAllUnavailable());
}
{
ClusterState clusterState = createClusterStateWith(
List.of(
indexNewlyCreated(
"myindex",
new ShardAllocation(
randomNodeId(),
CREATING,
new UnassignedInfo(
UnassignedInfo.Reason.NODE_LEFT,
"message",
null,
0,
0,
0,
false,
UnassignedInfo.AllocationStatus.NO_ATTEMPT,
Set.of(),
null
)
), // Primary 1
new ShardAllocation(randomNodeId(), UNAVAILABLE) // Replica 1
)
),
List.of()
);
var service = createShardsAvailabilityIndicatorService(clusterState);
ShardAllocationStatus status = service.createNewStatus(clusterState.metadata());
ShardsAvailabilityHealthIndicatorService.updateShardAllocationStatus(
status,
clusterState,
NodesShutdownMetadata.EMPTY,
randomBoolean()
);
// Here because the replica is unassigned due to the primary being created, it's treated as though the replica can be ignored.
assertFalse(
"an unassigned replica from a newly created and initializing primary "
+ "should not be treated as an index with all replicas unavailable",
status.replicas.doAnyIndicesHaveAllUnavailable()
);
}

/*
A couple of tests for
{@link ShardsAvailabilityHealthIndicatorService#areAllShardsOfThisTypeUnavailable(ShardRouting, ClusterState)}
*/
{
IndexRoutingTable routingTable = indexWithTwoPrimaryOneReplicaShard(
"myindex",
new ShardAllocation(randomNodeId(), AVAILABLE), // Primary 1
new ShardAllocation(randomNodeId(), AVAILABLE), // Replica 1
new ShardAllocation(randomNodeId(), AVAILABLE), // Primary 2
new ShardAllocation(randomNodeId(), UNAVAILABLE) // Replica 2
);
ClusterState clusterState = createClusterStateWith(List.of(routingTable), List.of());
var service = createShardsAvailabilityIndicatorService(clusterState);
ShardAllocationStatus status = service.createNewStatus(clusterState.metadata());
ShardsAvailabilityHealthIndicatorService.updateShardAllocationStatus(
status,
clusterState,
NodesShutdownMetadata.EMPTY,
randomBoolean()
);
var shardRouting = routingTable.shardsWithState(ShardRoutingState.UNASSIGNED).get(0);
assertTrue(service.areAllShardsOfThisTypeUnavailable(shardRouting, clusterState));
}
{
ClusterState clusterState = createClusterStateWith(
List.of(
index(
"myindex",
new ShardAllocation(randomNodeId(), AVAILABLE),
new ShardAllocation(randomNodeId(), AVAILABLE),
new ShardAllocation(randomNodeId(), UNAVAILABLE)
)
),
List.of()
);
var service = createShardsAvailabilityIndicatorService(clusterState);
ShardAllocationStatus status = service.createNewStatus(clusterState.metadata());
ShardsAvailabilityHealthIndicatorService.updateShardAllocationStatus(
status,
clusterState,
NodesShutdownMetadata.EMPTY,
randomBoolean()
);
var shardRouting = clusterState.routingTable().index("myindex").shardsWithState(ShardRoutingState.UNASSIGNED).get(0);
assertFalse(service.areAllShardsOfThisTypeUnavailable(shardRouting, clusterState));
}
}

public void testShouldBeRedWhenThereAreUnassignedPrimariesAndNoReplicas() {
Expand Down Expand Up @@ -1913,6 +2003,72 @@ public void testMappedFieldsForTelemetry() {
}
}

public void testIsNewlyCreatedAndInitializingReplica() {
ShardId id = new ShardId("index", "uuid", 0);
IndexMetadata idxMeta = IndexMetadata.builder("index")
.numberOfShards(1)
.numberOfReplicas(1)
.settings(
Settings.builder()
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", 1)
.put("index.version.created", IndexVersion.current())
.put("index.uuid", "uuid")
.build()
)
.build();
ShardRouting primary = createShardRouting(id, true, new ShardAllocation("node", AVAILABLE));
var state = createClusterStateWith(List.of(index("index", new ShardAllocation("node", AVAILABLE))), List.of());
assertFalse(ShardsAvailabilityHealthIndicatorService.isNewlyCreatedAndInitializingReplica(primary, state));

ShardRouting replica = createShardRouting(id, false, new ShardAllocation("node", AVAILABLE));
state = createClusterStateWith(List.of(index("index", new ShardAllocation("node", AVAILABLE))), List.of());
assertFalse(ShardsAvailabilityHealthIndicatorService.isNewlyCreatedAndInitializingReplica(replica, state));

ShardRouting unassignedReplica = createShardRouting(id, false, new ShardAllocation("node", UNAVAILABLE));
state = createClusterStateWith(
List.of(idxMeta),
List.of(index("index", "uuid", new ShardAllocation("node", UNAVAILABLE))),
List.of(),
List.of()
);
assertFalse(ShardsAvailabilityHealthIndicatorService.isNewlyCreatedAndInitializingReplica(unassignedReplica, state));

UnassignedInfo.Reason reason = randomFrom(UnassignedInfo.Reason.NODE_LEFT, UnassignedInfo.Reason.NODE_RESTARTING);
ShardAllocation allocation = new ShardAllocation(
"node",
UNAVAILABLE,
new UnassignedInfo(
reason,
"message",
null,
0,
0,
0,
randomBoolean(),
randomFrom(UnassignedInfo.AllocationStatus.values()),
Set.of(),
reason == UnassignedInfo.Reason.NODE_LEFT ? null : randomAlphaOfLength(20)
)
);
ShardRouting unallocatedReplica = createShardRouting(id, false, allocation);
state = createClusterStateWith(
List.of(idxMeta),
List.of(index(idxMeta, new ShardAllocation("node", UNAVAILABLE), allocation)),
List.of(),
List.of()
);
assertFalse(ShardsAvailabilityHealthIndicatorService.isNewlyCreatedAndInitializingReplica(unallocatedReplica, state));

state = createClusterStateWith(
List.of(idxMeta),
List.of(index(idxMeta, new ShardAllocation("node", CREATING), allocation)),
List.of(),
List.of()
);
assertTrue(ShardsAvailabilityHealthIndicatorService.isNewlyCreatedAndInitializingReplica(unallocatedReplica, state));
}

private HealthIndicatorResult createExpectedResult(
HealthStatus status,
String symptom,
Expand Down Expand Up @@ -2038,9 +2194,18 @@ private static Map<String, Object> addDefaults(Map<String, Object> override) {
}

private static IndexRoutingTable index(String name, ShardAllocation primaryState, ShardAllocation... replicaStates) {
return index(name, "_na_", primaryState, replicaStates);
}

private static IndexRoutingTable index(String name, String uuid, ShardAllocation primaryState, ShardAllocation... replicaStates) {
return index(
IndexMetadata.builder(name)
.settings(Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, IndexVersion.current()).build())
.settings(
Settings.builder()
.put(IndexMetadata.SETTING_VERSION_CREATED, IndexVersion.current())
.put(IndexMetadata.SETTING_INDEX_UUID, uuid)
.build()
)
.numberOfShards(1)
.numberOfReplicas(replicaStates.length)
.build(),
Expand All @@ -2049,6 +2214,21 @@ private static IndexRoutingTable index(String name, ShardAllocation primaryState
);
}

private static IndexRoutingTable indexNewlyCreated(String name, ShardAllocation primary1State, ShardAllocation replica1State) {
var indexMetadata = IndexMetadata.builder(name)
.settings(Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, IndexVersion.current()).build())
.numberOfShards(1)
.numberOfReplicas(1)
.build();
var index = indexMetadata.getIndex();
var shard1Id = new ShardId(index, 0);

var builder = IndexRoutingTable.builder(index);
builder.addShard(createShardRouting(shard1Id, true, primary1State));
builder.addShard(createShardRouting(shard1Id, false, replica1State));
return builder.build();
}

private static IndexRoutingTable indexWithTwoPrimaryOneReplicaShard(
String name,
ShardAllocation primary1State,
Expand Down

0 comments on commit 0719c90

Please sign in to comment.