Skip to content

Commit

Permalink
Node Stats Optmisation Changes
Browse files Browse the repository at this point in the history
  • Loading branch information
Pranshu-S authored and Harsh Garg committed Aug 9, 2024
1 parent 170ea27 commit 062a8d1
Show file tree
Hide file tree
Showing 7 changed files with 897 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@

import org.opensearch.ExceptionsHelper;
import org.opensearch.action.DocWriteResponse;
import org.opensearch.action.admin.cluster.node.stats.NodeStats;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.opensearch.action.admin.indices.stats.CommonStatsFlags;
import org.opensearch.action.bulk.BulkItemResponse;
import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.bulk.BulkResponse;
Expand All @@ -19,21 +22,35 @@
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.update.UpdateRequest;
import org.opensearch.action.update.UpdateResponse;
import org.opensearch.cluster.ClusterState;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.common.xcontent.XContentHelper;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.index.engine.DocumentMissingException;
import org.opensearch.index.engine.VersionConflictEngineException;
import org.opensearch.index.shard.IndexingStats.Stats.DocStatusStats;
import org.opensearch.indices.NodeIndicesStats;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.OpenSearchIntegTestCase.ClusterScope;
import org.opensearch.test.OpenSearchIntegTestCase.Scope;
import org.hamcrest.MatcherAssert;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;

import static java.util.Collections.singletonMap;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;

Expand Down Expand Up @@ -243,6 +260,280 @@ public void testNodeIndicesStatsDocStatusStatsCreateDeleteUpdate() {
}
}

public void testNodeIndicesStatsDocStatsWithAggregations() {
{ // Testing Create
final String INDEX = "create_index";
final String ID = "id";
DocStatusStats expectedDocStatusStats = new DocStatusStats();

IndexResponse response = client().index(new IndexRequest(INDEX).id(ID).source(SOURCE).create(true)).actionGet();
expectedDocStatusStats.inc(response.status());

CommonStatsFlags commonStatsFlags = new CommonStatsFlags();
commonStatsFlags.setIncludeIndicesStatsByLevel(true);

DocStatusStats docStatusStats = client().admin()
.cluster()
.prepareNodesStats()
.setIndices(commonStatsFlags)
.execute()
.actionGet()
.getNodes()
.get(0)
.getIndices()
.getIndexing()
.getTotal()
.getDocStatusStats();

assertTrue(
Arrays.equals(
docStatusStats.getDocStatusCounter(),
expectedDocStatusStats.getDocStatusCounter(),
Comparator.comparingLong(AtomicLong::longValue)
)
);
}
}

/**
* Default behavior - without consideration of request level param on level, the NodeStatsRequest always
* returns ShardStats which is aggregated on the coordinator node when creating the XContent.
*/
public void testNodeIndicesStatsXContentWithoutAggregationOnNodes() {
List<String> testLevels = new ArrayList<>();
testLevels.add("null");
testLevels.add(NodeIndicesStats.StatsLevel.NODE.getRestName());
testLevels.add(NodeIndicesStats.StatsLevel.INDICES.getRestName());
testLevels.add(NodeIndicesStats.StatsLevel.SHARDS.getRestName());
testLevels.add("unknown");

internalCluster().startNode();
ensureGreen();
String indexName = "test1";
assertAcked(
prepareCreate(
indexName,
clusterService().state().getNodes().getSize(),
Settings.builder().put("number_of_shards", 2).put("number_of_replicas", clusterService().state().getNodes().getSize() - 1)
)
);
ensureGreen();
ClusterState clusterState = client().admin().cluster().prepareState().get().getState();

testLevels.forEach(testLevel -> {
NodesStatsResponse response;
if (!testLevel.equals("null")) {
ArrayList<String> level_arg = new ArrayList<>();
level_arg.add(testLevel);

CommonStatsFlags commonStatsFlags = new CommonStatsFlags();
commonStatsFlags.setLevels(level_arg.toArray(new String[0]));
response = client().admin().cluster().prepareNodesStats().setIndices(commonStatsFlags).get();
} else {
response = client().admin().cluster().prepareNodesStats().get();
}

NodeStats nodeStats = response.getNodes().get(0);
assertNotNull(nodeStats.getIndices().getShardStats(clusterState.metadata().index(indexName).getIndex()));
try {
// Without any param - default is level = nodes
XContentBuilder builder = XContentFactory.jsonBuilder();
builder.startObject();
builder = nodeStats.getIndices().toXContent(builder, ToXContent.EMPTY_PARAMS);
builder.endObject();

Map<String, Object> xContentMap = xContentBuilderToMap(builder);
LinkedHashMap indicesStatsMap = (LinkedHashMap) xContentMap.get(NodeIndicesStats.StatsLevel.INDICES.getRestName());
assertFalse(indicesStatsMap.containsKey(NodeIndicesStats.StatsLevel.INDICES));
assertFalse(indicesStatsMap.containsKey(NodeIndicesStats.StatsLevel.SHARDS));

// With param containing level as 'indices', the indices stats are returned
builder = XContentFactory.jsonBuilder();
builder.startObject();
builder = nodeStats.getIndices()
.toXContent(
builder,
new ToXContent.MapParams(Collections.singletonMap("level", NodeIndicesStats.StatsLevel.INDICES.getRestName()))
);
builder.endObject();

xContentMap = xContentBuilderToMap(builder);
indicesStatsMap = (LinkedHashMap) xContentMap.get(NodeIndicesStats.StatsLevel.INDICES.getRestName());
assertTrue(indicesStatsMap.containsKey(NodeIndicesStats.StatsLevel.INDICES.getRestName()));
assertFalse(indicesStatsMap.containsKey(NodeIndicesStats.StatsLevel.SHARDS.getRestName()));

LinkedHashMap indexLevelStats = (LinkedHashMap) indicesStatsMap.get(NodeIndicesStats.StatsLevel.INDICES.getRestName());
assertTrue(indexLevelStats.containsKey(indexName));

// With param containing level as 'shards', the shard stats are returned
builder = XContentFactory.jsonBuilder();
builder.startObject();
builder = nodeStats.getIndices()
.toXContent(
builder,
new ToXContent.MapParams(Collections.singletonMap("level", NodeIndicesStats.StatsLevel.SHARDS.getRestName()))
);
builder.endObject();

xContentMap = xContentBuilderToMap(builder);
indicesStatsMap = (LinkedHashMap) xContentMap.get(NodeIndicesStats.StatsLevel.INDICES.getRestName());
assertFalse(indicesStatsMap.containsKey(NodeIndicesStats.StatsLevel.INDICES.getRestName()));
assertTrue(indicesStatsMap.containsKey(NodeIndicesStats.StatsLevel.SHARDS.getRestName()));

LinkedHashMap shardLevelStats = (LinkedHashMap) indicesStatsMap.get(NodeIndicesStats.StatsLevel.SHARDS.getRestName());
assertTrue(shardLevelStats.containsKey(indexName));
} catch (IOException e) {
throw new RuntimeException(e);
}
});
}

/**
* Aggregated behavior - to avoid unnecessary IO in the form of shard-stats when not required, we not honor the levels on the
* individual data nodes instead and pre-compute information as required.
*/
public void testNodeIndicesStatsXContentWithAggregationOnNodes() {
List<MockStatsLevel> testLevels = new ArrayList<>();

testLevels.add(MockStatsLevel.NULL);
testLevels.add(MockStatsLevel.NODE);
testLevels.add(MockStatsLevel.INDICES);
testLevels.add(MockStatsLevel.SHARDS);

internalCluster().startNode();
ensureGreen();
String indexName = "test1";
assertAcked(
prepareCreate(
indexName,
clusterService().state().getNodes().getSize(),
Settings.builder().put("number_of_shards", 2).put("number_of_replicas", clusterService().state().getNodes().getSize() - 1)
)
);
ensureGreen();

testLevels.forEach(testLevel -> {
NodesStatsResponse response;
CommonStatsFlags commonStatsFlags = new CommonStatsFlags();
commonStatsFlags.setIncludeIndicesStatsByLevel(true);
if (!testLevel.equals(MockStatsLevel.NULL)) {
ArrayList<String> level_arg = new ArrayList<>();
level_arg.add(testLevel.getRestName());

commonStatsFlags.setLevels(level_arg.toArray(new String[0]));
}
response = client().admin().cluster().prepareNodesStats().setIndices(commonStatsFlags).get();

NodeStats nodeStats = response.getNodes().get(0);
try {
XContentBuilder builder = XContentFactory.jsonBuilder();

builder.startObject();

if (!testLevel.equals(MockStatsLevel.SHARDS)) {
final XContentBuilder failedBuilder = builder;
assertThrows(
"Expected shard stats in response for generating [SHARDS] field",
AssertionError.class,
() -> nodeStats.getIndices()
.toXContent(
failedBuilder,
new ToXContent.MapParams(
Collections.singletonMap("level", NodeIndicesStats.StatsLevel.SHARDS.getRestName())
)
)
);
} else {
builder = nodeStats.getIndices()
.toXContent(
builder,
new ToXContent.MapParams(Collections.singletonMap("level", NodeIndicesStats.StatsLevel.SHARDS.getRestName()))
);
builder.endObject();

Map<String, Object> xContentMap = xContentBuilderToMap(builder);
LinkedHashMap indicesStatsMap = (LinkedHashMap) xContentMap.get(NodeIndicesStats.StatsLevel.INDICES.getRestName());
LinkedHashMap indicesStats = (LinkedHashMap) indicesStatsMap.get(NodeIndicesStats.StatsLevel.INDICES.getRestName());
LinkedHashMap shardStats = (LinkedHashMap) indicesStatsMap.get(NodeIndicesStats.StatsLevel.SHARDS.getRestName());

assertFalse(shardStats.isEmpty());
assertNull(indicesStats);
}

builder = XContentFactory.jsonBuilder();
builder.startObject();

if (!(testLevel.equals(MockStatsLevel.SHARDS) || testLevel.equals(MockStatsLevel.INDICES))) {
final XContentBuilder failedBuilder = builder;
assertThrows(
"Expected shard stats or index stats in response for generating INDICES field",
AssertionError.class,
() -> nodeStats.getIndices()
.toXContent(
failedBuilder,
new ToXContent.MapParams(
Collections.singletonMap("level", NodeIndicesStats.StatsLevel.INDICES.getRestName())
)
)
);
} else {
builder = nodeStats.getIndices()
.toXContent(
builder,
new ToXContent.MapParams(Collections.singletonMap("level", NodeIndicesStats.StatsLevel.INDICES.getRestName()))
);
builder.endObject();

Map<String, Object> xContentMap = xContentBuilderToMap(builder);
LinkedHashMap indicesStatsMap = (LinkedHashMap) xContentMap.get(NodeIndicesStats.StatsLevel.INDICES.getRestName());
LinkedHashMap indicesStats = (LinkedHashMap) indicesStatsMap.get(NodeIndicesStats.StatsLevel.INDICES.getRestName());
LinkedHashMap shardStats = (LinkedHashMap) indicesStatsMap.get(NodeIndicesStats.StatsLevel.SHARDS.getRestName());

switch (testLevel) {
case SHARDS:
case INDICES:
assertNull(shardStats);
assertFalse(indicesStats.isEmpty());
break;
}
}
} catch (IOException e) {
throw new RuntimeException(e);
}
});
}

public void testNodeIndicesStatsUnknownLevelThrowsException() {
MockStatsLevel testLevel = MockStatsLevel.UNKNOWN;
internalCluster().startNode();
ensureGreen();
String indexName = "test1";
assertAcked(
prepareCreate(
indexName,
clusterService().state().getNodes().getSize(),
Settings.builder().put("number_of_shards", 2).put("number_of_replicas", clusterService().state().getNodes().getSize() - 1)
)
);
ensureGreen();

NodesStatsResponse response;
CommonStatsFlags commonStatsFlags = new CommonStatsFlags();
commonStatsFlags.setIncludeIndicesStatsByLevel(true);
ArrayList<String> level_arg = new ArrayList<>();
level_arg.add(testLevel.getRestName());

commonStatsFlags.setLevels(level_arg.toArray(new String[0]));
response = client().admin().cluster().prepareNodesStats().setIndices(commonStatsFlags).get();

assertTrue(response.hasFailures());
assertEquals("Level provided is not supported by NodeIndicesStats", response.failures().get(0).getCause().getCause().getMessage());
}

private Map<String, Object> xContentBuilderToMap(XContentBuilder xContentBuilder) {
return XContentHelper.convertToMap(BytesReference.bytes(xContentBuilder), true, xContentBuilder.contentType()).v2();
}

private void assertDocStatusStats() {
DocStatusStats docStatusStats = client().admin()
.cluster()
Expand Down Expand Up @@ -273,4 +564,22 @@ private void updateExpectedDocStatusCounter(Exception e) {
expectedDocStatusStats.inc(ExceptionsHelper.status(e));
}

private enum MockStatsLevel {
INDICES(NodeIndicesStats.StatsLevel.INDICES.getRestName()),
SHARDS(NodeIndicesStats.StatsLevel.SHARDS.getRestName()),
NODE(NodeIndicesStats.StatsLevel.NODE.getRestName()),
NULL("null"),
UNKNOWN("unknown");

private final String restName;

MockStatsLevel(String restName) {
this.restName = restName;
}

public String getRestName() {
return restName;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public class CommonStatsFlags implements Writeable, Cloneable {
// Used for metric CACHE_STATS, to determine which caches to report stats for
private EnumSet<CacheType> includeCaches = EnumSet.noneOf(CacheType.class);
private String[] levels = new String[0];
private boolean includeIndicesStatsByLevel = false;

/**
* @param flags flags to set. If no flags are supplied, default flags will be set.
Expand Down Expand Up @@ -100,6 +101,9 @@ public CommonStatsFlags(StreamInput in) throws IOException {
includeCaches = in.readEnumSet(CacheType.class);
levels = in.readStringArray();
}
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
includeIndicesStatsByLevel = in.readBoolean();
}
}

@Override
Expand All @@ -124,6 +128,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeEnumSet(includeCaches);
out.writeStringArrayNullable(levels);
}
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
out.writeBoolean(includeIndicesStatsByLevel);
}
}

/**
Expand Down Expand Up @@ -262,6 +269,14 @@ public boolean includeSegmentFileSizes() {
return this.includeSegmentFileSizes;
}

public void setIncludeIndicesStatsByLevel(boolean includeIndicesStatsByLevel) {
this.includeIndicesStatsByLevel = includeIndicesStatsByLevel;
}

public boolean getIncludeIndicesStatsByLevel() {
return this.includeIndicesStatsByLevel;
}

public boolean isSet(Flag flag) {
return flags.contains(flag);
}
Expand Down
Loading

0 comments on commit 062a8d1

Please sign in to comment.