Skip to content

Commit

Permalink
[Monitoring] Make unassigned replica shard documents unique (#91153)
Browse files Browse the repository at this point in the history
* [Monitoring] Make unassigned replica shard documents unique

* Fix failing tests

* Use IndexRoutingTable/IndexShardRoutingTable to resolve shard order

* Use Random in test when shuffling

* Use Set to avoid duplicates and inline variable
  • Loading branch information
miltonhultgren authored Jan 10, 2023
1 parent 5316222 commit 232a64e
Show file tree
Hide file tree
Showing 4 changed files with 149 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,20 @@ public class ShardMonitoringDoc extends FilteredMonitoringDoc {
final long interval,
final MonitoringDoc.Node node,
final ShardRouting shardRouting,
final String clusterStateUUID
final String clusterStateUUID,
final int shardIndex
) {

super(cluster, timestamp, interval, node, MonitoredSystem.ES, TYPE, id(clusterStateUUID, shardRouting), XCONTENT_FILTERS);
super(
cluster,
timestamp,
interval,
node,
MonitoredSystem.ES,
TYPE,
id(clusterStateUUID, shardRouting, shardIndex),
XCONTENT_FILTERS
);
this.shardRouting = Objects.requireNonNull(shardRouting);
this.clusterStateUUID = Objects.requireNonNull(clusterStateUUID);
}
Expand All @@ -61,9 +71,9 @@ protected void innerToXContent(XContentBuilder builder, Params params) throws IO
/**
* Compute an id that has the format:
*
* {state_uuid}:{node_id || '_na'}:{index}:{shard}:{'p' || 'r'}
* {state_uuid}:{node_id || '_na'}:{index}:s{shard}:{'p' || 'rX'}
*/
public static String id(String stateUUID, ShardRouting shardRouting) {
public static String id(String stateUUID, ShardRouting shardRouting, int shardIndex) {
StringBuilder builder = new StringBuilder();
builder.append(stateUUID);
builder.append(':');
Expand All @@ -74,13 +84,14 @@ public static String id(String stateUUID, ShardRouting shardRouting) {
}
builder.append(':');
builder.append(shardRouting.getIndexName());
builder.append(':');
builder.append(":s");
builder.append(Integer.valueOf(shardRouting.id()));
builder.append(':');
if (shardRouting.primary()) {
builder.append("p");
} else {
builder.append("r");
builder.append(Integer.valueOf(shardIndex));
}
return builder.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
Expand All @@ -20,7 +22,9 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

/**
* Collector for shards.
Expand All @@ -46,28 +50,69 @@ protected Collection<MonitoringDoc> doCollect(final MonitoringDoc.Node node, fin
if (clusterState != null) {
RoutingTable routingTable = clusterState.routingTable();
if (routingTable != null) {
List<ShardRouting> shards = routingTable.allShards();
if (shards != null) {
final String clusterUuid = clusterUuid(clusterState);
final String stateUUID = clusterState.stateUUID();
final long timestamp = timestamp();
final String clusterUuid = clusterUuid(clusterState);
final String stateUUID = clusterState.stateUUID();
final long timestamp = timestamp();

final String[] indices = getCollectionIndices();
final boolean isAllIndices = IndexNameExpressionResolver.isAllIndices(Arrays.asList(indices));
final String[] indicesToMonitor = getCollectionIndices();
final boolean isAllIndices = IndexNameExpressionResolver.isAllIndices(Arrays.asList(indicesToMonitor));
final String[] indices = isAllIndices
? routingTable.indicesRouting().keySet().toArray(new String[0])
: expandIndexPattern(indicesToMonitor, routingTable.indicesRouting().keySet().toArray(new String[0]));

for (ShardRouting shard : shards) {
if (isAllIndices || Regex.simpleMatch(indices, shard.getIndexName())) {
MonitoringDoc.Node shardNode = null;
if (shard.assignedToNode()) {
for (String index : indices) {
IndexRoutingTable indexRoutingTable = routingTable.index(index);
if (indexRoutingTable != null) {
final int shardCount = indexRoutingTable.size();
for (int i = 0; i < shardCount; i++) {
IndexShardRoutingTable shardRoutingTable = indexRoutingTable.shard(i);

ShardRouting primary = shardRoutingTable.primaryShard();
MonitoringDoc.Node primaryShardNode = null;
if (primary.assignedToNode()) {
// If the shard is assigned to a node, the shard monitoring document refers to this node
shardNode = convertNode(node.getTimestamp(), clusterState.getNodes().get(shard.currentNodeId()));
primaryShardNode = convertNode(node.getTimestamp(), clusterState.getNodes().get(primary.currentNodeId()));
}
results.add(new ShardMonitoringDoc(clusterUuid, timestamp, interval, primaryShardNode, primary, stateUUID, 0));

List<ShardRouting> replicas = shardRoutingTable.replicaShards();
for (int j = 0; j < replicas.size(); j++) {
ShardRouting replica = replicas.get(j);

MonitoringDoc.Node replicaShardNode = null;
if (replica.assignedToNode()) {
replicaShardNode = convertNode(
node.getTimestamp(),
clusterState.getNodes().get(replica.currentNodeId())
);
}
results.add(
new ShardMonitoringDoc(clusterUuid, timestamp, interval, replicaShardNode, replica, stateUUID, j + 1)
);
}
results.add(new ShardMonitoringDoc(clusterUuid, timestamp, interval, shardNode, shard, stateUUID));
}
}
}
}
}
return Collections.unmodifiableCollection(results);
}

private String[] expandIndexPattern(String[] indicesToMonitor, String[] indices) {
final Set<String> expandedIndices = new HashSet<>();

for (String indexOrPattern : indicesToMonitor) {
if (indexOrPattern.contains("*")) {
for (String index : indices) {
if (Regex.simpleMatch(indexOrPattern, index)) {
expandedIndices.add(index);
}
}
} else {
expandedIndices.add(indexOrPattern);
}
}

return expandedIndices.toArray(new String[0]);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
package org.elasticsearch.xpack.monitoring.collector.shards;

import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
Expand All @@ -20,7 +22,10 @@

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.UUID;

import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED;
Expand All @@ -30,6 +35,7 @@
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.matchesPattern;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.ArgumentMatchers.eq;
Expand Down Expand Up @@ -77,7 +83,12 @@ public void testDoCollect() throws Exception {
final String stateUUID = UUID.randomUUID().toString();
when(clusterState.stateUUID()).thenReturn(stateUUID);

final String[] indices = randomFrom(NONE, Strings.EMPTY_ARRAY, new String[] { "_all" }, new String[] { "_index*" });
final String[] indices = randomFrom(
NONE,
Strings.EMPTY_ARRAY,
new String[] { "_all" },
new String[] { "_index*", "_does-not-exist" }
);
withCollectionIndices(indices);

final RoutingTable routingTable = mockRoutingTable();
Expand Down Expand Up @@ -111,7 +122,11 @@ public void testDoCollect() throws Exception {
assertThat(document.getIntervalMillis(), equalTo(interval));
assertThat(document.getSystem(), is(MonitoredSystem.ES));
assertThat(document.getType(), equalTo(ShardMonitoringDoc.TYPE));
assertThat(document.getId(), equalTo(ShardMonitoringDoc.id(stateUUID, document.getShardRouting())));
if (document.getShardRouting().primary()) {
assertThat(document.getId(), equalTo(ShardMonitoringDoc.id(stateUUID, document.getShardRouting(), 0)));
} else {
assertThat(document.getId(), matchesPattern("^[\\w-]+:(_current|_na)+:_index:s\\d:r[1-9]+[0-9]*"));
}
assertThat(document.getClusterStateUUID(), equalTo(stateUUID));

if (document.getShardRouting().assignedToNode()) {
Expand All @@ -130,15 +145,60 @@ public void testDoCollect() throws Exception {
private static RoutingTable mockRoutingTable() {
final List<ShardRouting> allShards = new ArrayList<>();

final int nbShards = randomIntBetween(0, 10);
for (int i = 0; i < nbShards; i++) {
final int numberOfPrimaryShards = randomIntBetween(0, 10);
for (int i = 0; i < numberOfPrimaryShards; i++) {
ShardRoutingState state = randomFrom(STARTED, UNASSIGNED);
ShardId shardId = new ShardId("_index", randomAlphaOfLength(12), i);
allShards.add(TestShardRouting.newShardRouting(shardId, state == STARTED ? "_current" : null, true, state));
ShardRouting primary = TestShardRouting.newShardRouting(shardId, state == STARTED ? "_current" : null, true, state);
allShards.add(primary);
}

final int numberOfReplicaShards = randomIntBetween(0, 3);
for (int i = 0; i < numberOfPrimaryShards; i++) {
for (int j = 0; j < numberOfReplicaShards; j++) {
ShardRoutingState state = randomFrom(STARTED, UNASSIGNED);
ShardId shardId = new ShardId("_index", randomAlphaOfLength(12), i);
ShardRouting replica = TestShardRouting.newShardRouting(shardId, state == STARTED ? "_current" : null, false, state);
allShards.add(replica);
}
}

final RoutingTable routingTable = mock(RoutingTable.class);
// _index* matches the test data above
when(routingTable.allShards("_index*")).thenReturn(allShards);
// _all is reserved to mean all indices in the routing table
when(routingTable.allShards("_all")).thenReturn(allShards);
// When collection indices is set to [], it's treated the same as "_all", so the key set of the routing table is used to grab the
// index names
when(routingTable.allShards("_index")).thenReturn(allShards);
// This mock routing table only has the index named "_index", so if collection indices is set to ["_none"] no shards should be
// found.
when(routingTable.allShards("_none")).thenReturn(new ArrayList<>(0));

final IndexRoutingTable indexRoutingTable = mock(IndexRoutingTable.class);
final Map<String, IndexRoutingTable> indicesRouting = Map.of("_index", indexRoutingTable);
when(routingTable.indicesRouting()).thenReturn(indicesRouting);
when(routingTable.index("_index")).thenReturn(indexRoutingTable);

when(indexRoutingTable.size()).thenReturn(numberOfPrimaryShards);
for (int i = 0; i < numberOfPrimaryShards; i++) {
final IndexShardRoutingTable shardRoutingTable = mock(IndexShardRoutingTable.class);
when(indexRoutingTable.shard(i)).thenReturn(shardRoutingTable);
when(shardRoutingTable.primaryShard()).thenReturn(allShards.get(i));
List<ShardRouting> replicas = new ArrayList<>();
int replicaIndexStart = numberOfPrimaryShards + i * numberOfReplicaShards;
int replicaIndexEnd = replicaIndexStart + numberOfReplicaShards;
for (int j = replicaIndexStart; j < replicaIndexEnd; j++) {
replicas.add(allShards.get(j));
}
when(shardRoutingTable.replicaShards()).thenReturn(replicas);
}

// This is only used by the test to decide how many shards should be covered
when(routingTable.allShards()).thenReturn(allShards);

Collections.shuffle(allShards, new Random(numberOfPrimaryShards));

return routingTable;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,14 @@ protected ShardMonitoringDoc createMonitoringDoc(
String type,
String id
) {
return new ShardMonitoringDoc(cluster, timestamp, interval, node, shardRouting, stateUuid);
return new ShardMonitoringDoc(cluster, timestamp, interval, node, shardRouting, stateUuid, shardRouting.primary() ? 0 : 1);
}

@Override
protected void assertFilteredMonitoringDoc(final ShardMonitoringDoc document) {
assertThat(document.getSystem(), is(MonitoredSystem.ES));
assertThat(document.getType(), is(ShardMonitoringDoc.TYPE));
assertThat(document.getId(), equalTo(ShardMonitoringDoc.id(stateUuid, shardRouting)));
assertThat(document.getId(), equalTo(ShardMonitoringDoc.id(stateUuid, shardRouting, shardRouting.primary() ? 0 : 1)));

assertThat(document.getShardRouting(), is(shardRouting));
if (assignedToNode) {
Expand All @@ -82,31 +82,31 @@ protected Set<String> getExpectedXContentFilters() {
}

public void testConstructorShardRoutingMustNotBeNull() {
expectThrows(NullPointerException.class, () -> new ShardMonitoringDoc(cluster, timestamp, interval, node, null, stateUuid));
expectThrows(NullPointerException.class, () -> new ShardMonitoringDoc(cluster, timestamp, interval, node, null, stateUuid, 0));
}

public void testConstructorStateUuidMustNotBeNull() {
expectThrows(NullPointerException.class, () -> new ShardMonitoringDoc(cluster, timestamp, interval, node, shardRouting, null));
expectThrows(NullPointerException.class, () -> new ShardMonitoringDoc(cluster, timestamp, interval, node, shardRouting, null, 0));
}

public void testIdWithPrimaryShardAssigned() {
shardRouting = newShardRouting("_index_0", 123, "_node_0", randomAlphaOfLength(5), true, INITIALIZING);
assertEquals("_state_uuid_0:_node_0:_index_0:123:p", ShardMonitoringDoc.id("_state_uuid_0", shardRouting));
assertEquals("_state_uuid_0:_node_0:_index_0:s123:p", ShardMonitoringDoc.id("_state_uuid_0", shardRouting, 0));
}

public void testIdWithReplicaShardAssigned() {
shardRouting = newShardRouting("_index_1", 456, "_node_1", randomAlphaOfLength(5), false, INITIALIZING);
assertEquals("_state_uuid_1:_node_1:_index_1:456:r", ShardMonitoringDoc.id("_state_uuid_1", shardRouting));
assertEquals("_state_uuid_1:_node_1:_index_1:s456:r1", ShardMonitoringDoc.id("_state_uuid_1", shardRouting, 1));
}

public void testIdWithPrimaryShardUnassigned() {
shardRouting = newShardRouting("_index_2", 789, null, true, UNASSIGNED);
assertEquals("_state_uuid_2:_na:_index_2:789:p", ShardMonitoringDoc.id("_state_uuid_2", shardRouting));
assertEquals("_state_uuid_2:_na:_index_2:s789:p", ShardMonitoringDoc.id("_state_uuid_2", shardRouting, 0));
}

public void testIdWithReplicaShardUnassigned() {
shardRouting = newShardRouting("_index_3", 159, null, false, UNASSIGNED);
assertEquals("_state_uuid_3:_na:_index_3:159:r", ShardMonitoringDoc.id("_state_uuid_3", shardRouting));
assertEquals("_state_uuid_3:_na:_index_3:s159:r1", ShardMonitoringDoc.id("_state_uuid_3", shardRouting, 1));
}

@Override
Expand All @@ -119,7 +119,8 @@ public void testToXContent() throws IOException {
1506593717631L,
node,
shardRouting,
"_state_uuid"
"_state_uuid",
0
);

final BytesReference xContent = XContentHelper.toXContent(doc, XContentType.JSON, randomBoolean());
Expand Down

0 comments on commit 232a64e

Please sign in to comment.