Skip to content

Commit

Permalink
Making _cat/allocation API not use shardLevelStats
Browse files Browse the repository at this point in the history
Signed-off-by: Harsh Garg <gkharsh@amazon.com>
  • Loading branch information
Harsh Garg committed Aug 19, 2024
1 parent 062a8d1 commit a14c91b
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 25 deletions.
30 changes: 29 additions & 1 deletion server/src/main/java/org/opensearch/indices/IndicesService.java
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@
import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -694,7 +695,10 @@ public NodeIndicesStats stats(CommonStatsFlags flags) {
}
if (flags.getIncludeIndicesStatsByLevel()) {
NodeIndicesStats.StatsLevel statsLevel = NodeIndicesStats.getAcceptedLevel(flags.getLevels());
return new NodeIndicesStats(commonStats, statsByShard(this, flags), searchRequestStats, statsLevel);
if (statsLevel == null || NodeIndicesStats.StatsLevel.INDICES.equals(statsLevel)) {
return new NodeIndicesStats(commonStats, null, statsByIndex(this, flags), statsLevel, searchRequestStats);
}
return new NodeIndicesStats(commonStats, statsByShard(this, flags), null, statsLevel, searchRequestStats);
} else {
return new NodeIndicesStats(commonStats, statsByShard(this, flags), searchRequestStats);
}
Expand Down Expand Up @@ -727,6 +731,30 @@ Map<Index, List<IndexShardStats>> statsByShard(final IndicesService indicesServi
return statsByShard;
}

Map<Index, CommonStats> statsByIndex(final IndicesService indicesService, final CommonStatsFlags flags) {
final Map<Index, CommonStats> statsByIndex = new HashMap<>();

for (final IndexService indexService : indicesService) {
for (final IndexShard indexShard : indexService) {
try {
final IndexShardStats indexShardStats = indicesService.indexShardStats(indicesService, indexShard, flags);
if (indexShardStats == null) {
continue;
}
if (!statsByIndex.containsKey(indexShard.shardId().getIndex())) {
statsByIndex.put(indexShard.shardId().getIndex(), new CommonStats());
}
Arrays.stream(indexShardStats.getShards())
.forEach(shardStats -> statsByIndex.get(indexShard.shardId().getIndex()).add(shardStats.getStats()));
} catch (IllegalIndexShardStateException | AlreadyClosedException e) {
// we can safely ignore illegal state on ones that are closing for example
logger.trace(() -> new ParameterizedMessage("{} ignoring shard stats", indexShard.shardId()), e);
}
}
}
return statsByIndex;
}

IndexShardStats indexShardStats(final IndicesService indicesService, final IndexShard indexShard, final CommonStatsFlags flags) {
if (indexShard.routingEntry() == null) {
return null;
Expand Down
27 changes: 16 additions & 11 deletions server/src/main/java/org/opensearch/indices/NodeIndicesStats.java
Original file line number Diff line number Diff line change
Expand Up @@ -127,33 +127,38 @@ public NodeIndicesStats(CommonStats oldStats, Map<Index, List<IndexShardStats>>
public NodeIndicesStats(
CommonStats oldStats,
Map<Index, List<IndexShardStats>> statsByShard,
SearchRequestStats searchRequestStats,
StatsLevel level
Map<Index, CommonStats> statsByIndex,
StatsLevel level,
SearchRequestStats searchRequestStats
) {
// make a total common stats from old ones and current ones
this.stats = oldStats;
for (List<IndexShardStats> shardStatsList : statsByShard.values()) {
for (IndexShardStats indexShardStats : shardStatsList) {
for (ShardStats shardStats : indexShardStats.getShards()) {
stats.add(shardStats.getStats());
if (statsByIndex != null) {
statsByIndex.values().forEach(commonStats -> stats.add(commonStats));
} else {
for (List<IndexShardStats> shardStatsList : statsByShard.values()) {
for (IndexShardStats indexShardStats : shardStatsList) {
for (ShardStats shardStats : indexShardStats.getShards()) {
stats.add(shardStats.getStats());
}
}
}
}

if (this.stats.search != null) {
this.stats.search.setSearchRequestStats(searchRequestStats);
}

if (level != null) {
switch (level) {
case INDICES:
this.statsByIndex = createStatsByIndex(statsByShard);
this.statsByIndex = statsByIndex;
break;
case SHARDS:
this.statsByShard = statsByShard;
break;
}
}

if (this.stats.search != null) {
this.stats.search.setSearchRequestStats(searchRequestStats);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ public void processResponse(final ClusterStateResponse state) {
statsRequest.clear()
.addMetric(NodesStatsRequest.Metric.FS.metricName())
.indices(new CommonStatsFlags(CommonStatsFlags.Flag.Store));
statsRequest.indices().setIncludeIndicesStatsByLevel(true);
statsRequest.setIncludeDiscoveryNodes(false);

client.admin().cluster().nodesStats(statsRequest, new RestResponseListener<NodesStatsResponse>(channel) {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1106,10 +1106,11 @@ public MockNodeIndicesStats(
public MockNodeIndicesStats(
CommonStats oldStats,
Map<Index, List<IndexShardStats>> statsByShard,
SearchRequestStats searchRequestStats,
StatsLevel level
Map<Index, CommonStats> statsByIndex,
StatsLevel level,
SearchRequestStats searchRequestStats
) {
super(oldStats, statsByShard, searchRequestStats, level);
super(oldStats, statsByShard, statsByIndex, level, searchRequestStats);
}

public CommonStats getStats() {
Expand Down Expand Up @@ -1300,6 +1301,7 @@ public void testNodeIndicesStatsWithAndWithoutAggregations() throws IOException

ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
HashMap<Index, List<IndexShardStats>> statsByShards = createRandomShardByStats(indexList);
Map<Index, CommonStats> statsByIndex = createStatsByIndex(statsByShards);

final MockNodeIndicesStats nonAggregatedNodeIndicesStats = new MockNodeIndicesStats(
new CommonStats(commonStatsFlags),
Expand All @@ -1310,12 +1312,24 @@ public void testNodeIndicesStatsWithAndWithoutAggregations() throws IOException
commonStatsFlags.setIncludeIndicesStatsByLevel(true);

Arrays.stream(NodeIndicesStats.StatsLevel.values()).forEach(level -> {
MockNodeIndicesStats aggregatedNodeIndicesStats = new MockNodeIndicesStats(
new CommonStats(commonStatsFlags),
statsByShards,
new SearchRequestStats(clusterSettings),
level
);
MockNodeIndicesStats aggregatedNodeIndicesStats;
if (level.equals(NodeIndicesStats.StatsLevel.INDICES) || level.equals(NodeIndicesStats.StatsLevel.NODE)) {
aggregatedNodeIndicesStats = new MockNodeIndicesStats(
new CommonStats(commonStatsFlags),
null,
statsByIndex,
level,
new SearchRequestStats(clusterSettings)
);
} else {
aggregatedNodeIndicesStats = new MockNodeIndicesStats(
new CommonStats(commonStatsFlags),
statsByShards,
null,
level,
new SearchRequestStats(clusterSettings)
);
}

XContentBuilder nonAggregatedBuilder = null;
XContentBuilder aggregatedBuilder = null;
Expand Down Expand Up @@ -1406,6 +1420,22 @@ private HashMap<Index, List<IndexShardStats>> createRandomShardByStats(List<Inde
return statsByShards;
}

private Map<Index, CommonStats> createStatsByIndex(Map<Index, List<IndexShardStats>> statsByShards) {
Map<Index, CommonStats> statsByIndex = new HashMap<>();
for (Map.Entry<Index, List<IndexShardStats>> entry : statsByShards.entrySet()) {
if (!statsByIndex.containsKey(entry.getKey())) {
statsByIndex.put(entry.getKey(), new CommonStats());
}

for (IndexShardStats indexShardStats : entry.getValue()) {
for (ShardStats shardStats : indexShardStats.getShards()) {
statsByIndex.get(entry.getKey()).add(shardStats.getStats());
}
}
}
return statsByIndex;
}

private Map<String, Object> xContentBuilderToMap(XContentBuilder xContentBuilder) {
return XContentHelper.convertToMap(BytesReference.bytes(xContentBuilder), true, xContentBuilder.contentType()).v2();
}
Expand Down Expand Up @@ -1451,15 +1481,27 @@ public MockNodeIndicesStats generateMockNodeIndicesStats(
}

statsByShard.put(statsIndex, indexShardStatsList);
Map<Index, CommonStats> statsByIndex = createStatsByIndex(statsByShard);

ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);

if (commonStatsFlags.getIncludeIndicesStatsByLevel()) {

if (level == null || level.equals(NodeIndicesStats.StatsLevel.INDICES) || level.equals(NodeIndicesStats.StatsLevel.NODE)) {
return new MockNodeIndicesStats(
new CommonStats(commonStatsFlags),
null,
statsByIndex,
level,
new SearchRequestStats(clusterSettings)
);
}
return new MockNodeIndicesStats(
new CommonStats(commonStatsFlags),
statsByShard,
new SearchRequestStats(clusterSettings),
level
null,
level,
new SearchRequestStats(clusterSettings)
);
} else {
return new MockNodeIndicesStats(new CommonStats(commonStatsFlags), statsByShard, new SearchRequestStats(clusterSettings));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -699,8 +699,8 @@ public void onFailure(String source, Exception e) {
});
assertBusy(mockAppender::assertAllExpectationsMatched);
// verify stats values after state is published
assertEquals(1, clusterManagerService.getClusterStateStats().getUpdateSuccess());
assertEquals(0, clusterManagerService.getClusterStateStats().getUpdateFailed());
assertTrue(waitUntil(() -> clusterManagerService.getClusterStateStats().getUpdateSuccess() == 1 &&
clusterManagerService.getClusterStateStats().getUpdateFailed() == 0));
}
}
}
Expand Down

0 comments on commit a14c91b

Please sign in to comment.