Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize NodeIndicesStats output behind flag #14454

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
f6d3b58
Optimize NodeIndicesStats output behind flag
Pranshu-S Jun 27, 2024
af0ab18
Merge remote-tracking branch 'origin/main' into NodeStatsOptimisation…
Pranshu-S Jul 10, 2024
a267aef
Modify optimisation approach without Cluster Settings
Pranshu-S Jul 10, 2024
e009a84
Merge remote-tracking branch 'origin/main' into NodeStatsOptimisation…
Pranshu-S Jul 10, 2024
857e7e1
Bumping changes to major version 3_0_0 to avoid BWC test failures
Pranshu-S Jul 11, 2024
9876c13
Fix Failing Tests
Pranshu-S Jul 12, 2024
61bf2e0
Fix test logic
Pranshu-S Jul 12, 2024
73de188
fix test for misc level param output
Pranshu-S Jul 15, 2024
339bd12
Retry Build
Pranshu-S Jul 15, 2024
fa2373f
Retry Build
Pranshu-S Jul 15, 2024
b7db4bd
Add ChangeLogs
Pranshu-S Jul 16, 2024
40fa8ce
Adding code coverage
Pranshu-S Jul 17, 2024
8aef256
Fix Flakyness of tests
Pranshu-S Jul 17, 2024
6672c25
Adding UTs for code coverage
Pranshu-S Jul 18, 2024
c148d0f
fix spotless checks and more UTs
Pranshu-S Jul 18, 2024
8416ec8
Refactoring naming
Pranshu-S Jul 18, 2024
afeb38d
Making Fields as enum and refacotring NodesStatsRequestBuilder
Pranshu-S Jul 20, 2024
b657072
Fix level selection
Pranshu-S Jul 20, 2024
67749bd
Add comments and variable refactoring
Pranshu-S Jul 22, 2024
768456a
Merge remote-tracking branch 'origin/main' into NodeStatsOptimisation…
Pranshu-S Jul 22, 2024
aef96d9
Fix ChangeLogs
Pranshu-S Jul 22, 2024
8029c78
Merge remote-tracking branch 'origin/main' into NodeStatsOptimisation…
Pranshu-S Jul 22, 2024
1128006
Retry Build
Pranshu-S Jul 22, 2024
5203097
Adding tests and addressing nit comments
Pranshu-S Jul 23, 2024
a0dff3f
Addressing comments
Pranshu-S Jul 23, 2024
ff66431
Add Node Level Validations
Pranshu-S Jul 23, 2024
d78c948
Refactor validation and simplify tests
Pranshu-S Jul 23, 2024
1cf7962
Merge remote-tracking branch 'origin/main' into NodeStatsOptimisation…
Pranshu-S Aug 7, 2024
88d9af7
Merge remote-tracking branch 'origin/main' into NodeStatsOptimisation…
Pranshu-S Aug 28, 2024
bcceb86
Addressing comments
Pranshu-S Aug 28, 2024
2f4187f
Merge remote-tracking branch 'origin/main' into NodeStatsOptimisation…
Pranshu-S Aug 29, 2024
40e0fc9
Retry Build
Pranshu-S Aug 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add concurrent search support for Derived Fields ([#15326](https://github.com/opensearch-project/OpenSearch/pull/15326))
- [Workload Management] Add query group stats constructs ([#15343](https://github.com/opensearch-project/OpenSearch/pull/15343)))
- Add runAs to Subject interface and introduce IdentityAwarePlugin extension point ([#14630](https://github.com/opensearch-project/OpenSearch/pull/14630))
- Optimize NodeIndicesStats output behind flag ([#14454](https://github.com/opensearch-project/OpenSearch/pull/14454))

### Dependencies
- Bump `netty` from 4.1.111.Final to 4.1.112.Final ([#15081](https://github.com/opensearch-project/OpenSearch/pull/15081))
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public class CommonStatsFlags implements Writeable, Cloneable {
// Used for metric CACHE_STATS, to determine which caches to report stats for
private EnumSet<CacheType> includeCaches = EnumSet.noneOf(CacheType.class);
private String[] levels = new String[0];
private boolean includeIndicesStatsByLevel = false;

/**
* @param flags flags to set. If no flags are supplied, default flags will be set.
Expand Down Expand Up @@ -100,6 +101,9 @@ public CommonStatsFlags(StreamInput in) throws IOException {
includeCaches = in.readEnumSet(CacheType.class);
levels = in.readStringArray();
}
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
includeIndicesStatsByLevel = in.readBoolean();
}
}

@Override
Expand All @@ -124,6 +128,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeEnumSet(includeCaches);
out.writeStringArrayNullable(levels);
}
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
Pranshu-S marked this conversation as resolved.
Show resolved Hide resolved
out.writeBoolean(includeIndicesStatsByLevel);
}
}

/**
Expand Down Expand Up @@ -262,6 +269,14 @@ public boolean includeSegmentFileSizes() {
return this.includeSegmentFileSizes;
}

public void setIncludeIndicesStatsByLevel(boolean includeIndicesStatsByLevel) {
this.includeIndicesStatsByLevel = includeIndicesStatsByLevel;
}

public boolean getIncludeIndicesStatsByLevel() {
return this.includeIndicesStatsByLevel;
}

public boolean isSet(Flag flag) {
return flags.contains(flag);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -693,8 +693,12 @@
break;
}
}

return new NodeIndicesStats(commonStats, statsByShard(this, flags), searchRequestStats);
if (flags.getIncludeIndicesStatsByLevel()) {
NodeIndicesStats.StatsLevel statsLevel = NodeIndicesStats.getAcceptedLevel(flags.getLevels());
return new NodeIndicesStats(commonStats, statsByShard(this, flags), searchRequestStats, statsLevel);

Check warning on line 698 in server/src/main/java/org/opensearch/indices/IndicesService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/indices/IndicesService.java#L697-L698

Added lines #L697 - L698 were not covered by tests
} else {
return new NodeIndicesStats(commonStats, statsByShard(this, flags), searchRequestStats);
}
}

Map<Index, List<IndexShardStats>> statsByShard(final IndicesService indicesService, final CommonStatsFlags flags) {
Expand Down
199 changes: 171 additions & 28 deletions server/src/main/java/org/opensearch/indices/NodeIndicesStats.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

package org.opensearch.indices;

import org.opensearch.Version;
import org.opensearch.action.admin.indices.stats.CommonStats;
import org.opensearch.action.admin.indices.stats.IndexShardStats;
import org.opensearch.action.admin.indices.stats.ShardStats;
Expand Down Expand Up @@ -63,9 +64,11 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

/**
* Global information on indices stats running on a specific node.
Expand All @@ -74,26 +77,27 @@
*/
@PublicApi(since = "1.0.0")
public class NodeIndicesStats implements Writeable, ToXContentFragment {
private CommonStats stats;
private Map<Index, List<IndexShardStats>> statsByShard;
protected CommonStats stats;
protected Map<Index, CommonStats> statsByIndex;
Pranshu-S marked this conversation as resolved.
Show resolved Hide resolved
protected Map<Index, List<IndexShardStats>> statsByShard;

public NodeIndicesStats(StreamInput in) throws IOException {
stats = new CommonStats(in);
if (in.readBoolean()) {
int entries = in.readVInt();
statsByShard = new HashMap<>();
for (int i = 0; i < entries; i++) {
Index index = new Index(in);
int indexShardListSize = in.readVInt();
List<IndexShardStats> indexShardStats = new ArrayList<>(indexShardListSize);
for (int j = 0; j < indexShardListSize; j++) {
indexShardStats.add(new IndexShardStats(in));
}
statsByShard.put(index, indexShardStats);
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
Pranshu-S marked this conversation as resolved.
Show resolved Hide resolved
Pranshu-S marked this conversation as resolved.
Show resolved Hide resolved
Pranshu-S marked this conversation as resolved.
Show resolved Hide resolved
// contains statsByIndex
if (in.readBoolean()) {
statsByIndex = readStatsByIndex(in);
}
}
if (in.readBoolean()) {
statsByShard = readStatsByShard(in);
}
}

/**
* Without passing the information of the levels to the constructor, we return the Node-level aggregated stats as
* {@link CommonStats} along with a hash-map containing Index to List of Shard Stats.
*/
public NodeIndicesStats(CommonStats oldStats, Map<Index, List<IndexShardStats>> statsByShard, SearchRequestStats searchRequestStats) {
// this.stats = stats;
this.statsByShard = statsByShard;
Expand All @@ -112,6 +116,90 @@
}
}

/**
* Passing the level information to the nodes allows us to aggregate the stats based on the level passed. This
* allows us to aggregate based on NodeLevel (default - if no level is passed) or Index level if `indices` level is
* passed and finally return the statsByShards map if `shards` level is passed. This allows us to reduce ser/de of
* stats and return only the information that is required while returning to the client.
*/
public NodeIndicesStats(
Pranshu-S marked this conversation as resolved.
Show resolved Hide resolved
CommonStats oldStats,
Map<Index, List<IndexShardStats>> statsByShard,
SearchRequestStats searchRequestStats,
StatsLevel level
) {
// make a total common stats from old ones and current ones
this.stats = oldStats;
for (List<IndexShardStats> shardStatsList : statsByShard.values()) {
for (IndexShardStats indexShardStats : shardStatsList) {
for (ShardStats shardStats : indexShardStats.getShards()) {
stats.add(shardStats.getStats());
}
}
}
Pranshu-S marked this conversation as resolved.
Show resolved Hide resolved

if (this.stats.search != null) {
this.stats.search.setSearchRequestStats(searchRequestStats);

Check warning on line 142 in server/src/main/java/org/opensearch/indices/NodeIndicesStats.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/indices/NodeIndicesStats.java#L142

Added line #L142 was not covered by tests
}

if (level != null) {
switch (level) {
case INDICES:
this.statsByIndex = createStatsByIndex(statsByShard);
break;
case SHARDS:
this.statsByShard = statsByShard;
break;
}
}
}

/**
* By default, the levels passed from the transport action will be a list of strings, since NodeIndicesStats can
* only aggregate on one level, we pick the first accepted level else we ignore if no known level is passed. Level is
* selected based on enum defined in {@link StatsLevel}
*
* Note - we are picking the first level as multiple levels are not supported in the previous versions.
* @param levels - levels sent in the request.
*
* @return Corresponding identified enum {@link StatsLevel}
*/
public static StatsLevel getAcceptedLevel(String[] levels) {
if (levels != null && levels.length > 0) {
Optional<StatsLevel> level = Arrays.stream(StatsLevel.values())
.filter(field -> field.getRestName().equals(levels[0]))
Pranshu-S marked this conversation as resolved.
Show resolved Hide resolved
.findFirst();
Pranshu-S marked this conversation as resolved.
Show resolved Hide resolved
return level.orElseThrow(() -> new IllegalArgumentException("Level provided is not supported by NodeIndicesStats"));

Check warning on line 172 in server/src/main/java/org/opensearch/indices/NodeIndicesStats.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/indices/NodeIndicesStats.java#L169-L172

Added lines #L169 - L172 were not covered by tests
}
return null;

Check warning on line 174 in server/src/main/java/org/opensearch/indices/NodeIndicesStats.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/indices/NodeIndicesStats.java#L174

Added line #L174 was not covered by tests
}

private Map<Index, CommonStats> readStatsByIndex(StreamInput in) throws IOException {
Map<Index, CommonStats> statsByIndex = new HashMap<>();
int indexEntries = in.readVInt();
for (int i = 0; i < indexEntries; i++) {
Index index = new Index(in);
CommonStats commonStats = new CommonStats(in);
statsByIndex.put(index, commonStats);
}
return statsByIndex;
}

private Map<Index, List<IndexShardStats>> readStatsByShard(StreamInput in) throws IOException {
Map<Index, List<IndexShardStats>> statsByShard = new HashMap<>();
int entries = in.readVInt();
for (int i = 0; i < entries; i++) {
Index index = new Index(in);
int indexShardListSize = in.readVInt();
List<IndexShardStats> indexShardStats = new ArrayList<>(indexShardListSize);
for (int j = 0; j < indexShardListSize; j++) {
indexShardStats.add(new IndexShardStats(in));
}
statsByShard.put(index, indexShardStats);
}
return statsByShard;
}

@Nullable
public StoreStats getStore() {
return stats.getStore();
Expand Down Expand Up @@ -195,7 +283,31 @@
@Override
public void writeTo(StreamOutput out) throws IOException {
stats.writeTo(out);

if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
Pranshu-S marked this conversation as resolved.
Show resolved Hide resolved
out.writeBoolean(statsByIndex != null);
if (statsByIndex != null) {
writeStatsByIndex(out);
}
}

out.writeBoolean(statsByShard != null);
if (statsByShard != null) {
writeStatsByShards(out);
}
}

private void writeStatsByIndex(StreamOutput out) throws IOException {
if (statsByIndex != null) {
out.writeVInt(statsByIndex.size());
for (Map.Entry<Index, CommonStats> entry : statsByIndex.entrySet()) {
entry.getKey().writeTo(out);
entry.getValue().writeTo(out);
}
}
}

private void writeStatsByShards(StreamOutput out) throws IOException {
if (statsByShard != null) {
out.writeVInt(statsByShard.size());
for (Map.Entry<Index, List<IndexShardStats>> entry : statsByShard.entrySet()) {
Expand All @@ -210,29 +322,46 @@

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
final String level = params.param("level", "node");
final boolean isLevelValid = "indices".equalsIgnoreCase(level)
|| "node".equalsIgnoreCase(level)
|| "shards".equalsIgnoreCase(level);
final String level = params.param("level", StatsLevel.NODE.getRestName());
final boolean isLevelValid = StatsLevel.NODE.getRestName().equalsIgnoreCase(level)
|| StatsLevel.INDICES.getRestName().equalsIgnoreCase(level)
|| StatsLevel.SHARDS.getRestName().equalsIgnoreCase(level);
if (!isLevelValid) {
throw new IllegalArgumentException("level parameter must be one of [indices] or [node] or [shards] but was [" + level + "]");
throw new IllegalArgumentException(
"level parameter must be one of ["
+ StatsLevel.INDICES.getRestName()
+ "] or ["
+ StatsLevel.NODE.getRestName()
+ "] or ["
+ StatsLevel.SHARDS.getRestName()
+ "] but was ["
+ level
+ "]"
);
}

// "node" level
builder.startObject(Fields.INDICES);
builder.startObject(StatsLevel.INDICES.getRestName());
ashking94 marked this conversation as resolved.
Show resolved Hide resolved
stats.toXContent(builder, params);

if ("indices".equals(level)) {
Map<Index, CommonStats> indexStats = createStatsByIndex();
builder.startObject(Fields.INDICES);
for (Map.Entry<Index, CommonStats> entry : indexStats.entrySet()) {
if (StatsLevel.INDICES.getRestName().equals(level)) {
assert statsByIndex != null || statsByShard != null : "Expected shard stats or index stats in response for generating ["
ashking94 marked this conversation as resolved.
Show resolved Hide resolved
+ StatsLevel.INDICES
+ "] field";
if (statsByIndex == null) {
statsByIndex = createStatsByIndex(statsByShard);
Pranshu-S marked this conversation as resolved.
Show resolved Hide resolved
}

builder.startObject(StatsLevel.INDICES.getRestName());
for (Map.Entry<Index, CommonStats> entry : statsByIndex.entrySet()) {
builder.startObject(entry.getKey().getName());
entry.getValue().toXContent(builder, params);
builder.endObject();
}
builder.endObject();
Pranshu-S marked this conversation as resolved.
Show resolved Hide resolved
} else if ("shards".equals(level)) {
builder.startObject("shards");
} else if (StatsLevel.SHARDS.getRestName().equals(level)) {
builder.startObject(StatsLevel.SHARDS.getRestName());
assert statsByShard != null : "Expected shard stats in response for generating [" + StatsLevel.SHARDS + "] field";
for (Map.Entry<Index, List<IndexShardStats>> entry : statsByShard.entrySet()) {
builder.startArray(entry.getKey().getName());
for (IndexShardStats indexShardStats : entry.getValue()) {
Expand All @@ -251,7 +380,7 @@
return builder;
}

private Map<Index, CommonStats> createStatsByIndex() {
private Map<Index, CommonStats> createStatsByIndex(Map<Index, List<IndexShardStats>> statsByShard) {
Map<Index, CommonStats> statsMap = new HashMap<>();
for (Map.Entry<Index, List<IndexShardStats>> entry : statsByShard.entrySet()) {
if (!statsMap.containsKey(entry.getKey())) {
Pranshu-S marked this conversation as resolved.
Show resolved Hide resolved
Expand Down Expand Up @@ -281,7 +410,21 @@
*
* @opensearch.internal
*/
static final class Fields {
static final String INDICES = "indices";
@PublicApi(since = "3.0.0")
shwetathareja marked this conversation as resolved.
Show resolved Hide resolved
public enum StatsLevel {
INDICES("indices"),
SHARDS("shards"),
NODE("node");

private final String restName;

StatsLevel(String restName) {
this.restName = restName;
}

public String getRestName() {
return restName;
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
String[] levels = Strings.splitStringByCommaToArray(request.param("level"));
nodesStatsRequest.indices().setLevels(levels);
nodesStatsRequest.setIncludeDiscoveryNodes(false);
nodesStatsRequest.indices().setIncludeIndicesStatsByLevel(true);

return channel -> client.admin().cluster().nodesStats(nodesStatsRequest, new NodesResponseRestListener<>(channel));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@
NodesStatsRequest.Metric.PROCESS.metricName(),
NodesStatsRequest.Metric.SCRIPT.metricName()
);
nodesStatsRequest.indices().setIncludeIndicesStatsByLevel(true);

Check warning on line 151 in server/src/main/java/org/opensearch/rest/action/cat/RestNodesAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/rest/action/cat/RestNodesAction.java#L151

Added line #L151 was not covered by tests
client.admin().cluster().nodesStats(nodesStatsRequest, new RestResponseListener<NodesStatsResponse>(channel) {
@Override
public RestResponse buildResponse(NodesStatsResponse nodesStatsResponse) throws Exception {
Expand Down
Loading
Loading