Skip to content

Commit

Permalink
Add shard indexing pressure metric/stats via rest end point. (#478)
Browse files Browse the repository at this point in the history
Signed-off-by: Saurabh Singh <sisurab@amazon.com>
  • Loading branch information
getsaurabh02 committed Apr 5, 2021
1 parent 8ab960a commit 657c705
Show file tree
Hide file tree
Showing 11 changed files with 296 additions and 15 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,239 @@
/*
* Copyright OpenSearch Contributors.
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.http;

import org.opensearch.client.Request;
import org.opensearch.client.Response;
import org.opensearch.client.ResponseException;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.xcontent.XContentHelper;
import org.opensearch.common.xcontent.json.JsonXContent;
import org.opensearch.index.IndexingPressure;
import org.opensearch.index.ShardIndexingPressureSettings;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.XContentTestUtils;

import java.util.ArrayList;
import java.util.Map;

import static org.opensearch.rest.RestStatus.CREATED;
import static org.opensearch.rest.RestStatus.OK;
import static org.opensearch.rest.RestStatus.TOO_MANY_REQUESTS;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.lessThan;

/**
* Test Shard Indexing Pressure Metrics and Statistics
*/
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE, supportsDedicatedMasters = false, numDataNodes = 2,
numClientNodes = 0)
public class ShardIndexingPressureRestIT extends HttpSmokeTestCase {

private static final Settings unboundedWriteQueue = Settings.builder().put("thread_pool.write.queue_size", -1).build();

@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(IndexingPressure.MAX_INDEXING_BYTES.getKey(), "1KB")
.put(ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENABLED.getKey(), true)
.put(ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENFORCED.getKey(), true)
.put(unboundedWriteQueue)
.build();
}

@SuppressWarnings("unchecked")
public void testShardIndexingPressureStats() throws Exception {
Request createRequest = new Request("PUT", "/index_name");
createRequest.setJsonEntity("{\"settings\": {\"index\": {\"number_of_shards\": 1, \"number_of_replicas\": 1, " +
"\"write.wait_for_active_shards\": 2}}}");
final Response indexCreatedResponse = getRestClient().performRequest(createRequest);
assertThat(indexCreatedResponse.getStatusLine().getStatusCode(), equalTo(OK.getStatus()));

Request successfulIndexingRequest = new Request("POST", "/index_name/_doc/");
successfulIndexingRequest.setJsonEntity("{\"x\": \"small text\"}");
final Response indexSuccessFul = getRestClient().performRequest(successfulIndexingRequest);
assertThat(indexSuccessFul.getStatusLine().getStatusCode(), equalTo(CREATED.getStatus()));

Request getShardStats1 = new Request("GET", "/_nodes/stats/shard_indexing_pressure?include_all");
final Response shardStats1 = getRestClient().performRequest(getShardStats1);
Map<String, Object> shardStatsMap1 = XContentHelper.convertToMap(JsonXContent.jsonXContent, shardStats1.getEntity().getContent(),
true);
ArrayList<Object> values1 = new ArrayList<>(((Map<Object, Object>) shardStatsMap1.get("nodes")).values());
assertThat(values1.size(), equalTo(2));
XContentTestUtils.JsonMapView node1 = new XContentTestUtils.JsonMapView((Map<String, Object>) values1.get(0));
ArrayList<Object> shard1IndexingPressureValues = new ArrayList<>(((Map<Object, Object>) node1.get("shard_indexing_pressure.stats"))
.values());
assertThat(shard1IndexingPressureValues.size(), equalTo(1));
XContentTestUtils.JsonMapView shard1 = new XContentTestUtils.JsonMapView((Map<String, Object>) shard1IndexingPressureValues.get(0));
Integer node1TotalLimitsRejections = node1.get("shard_indexing_pressure.total_rejections_breakup.node_limits");
Integer shard1CoordinatingBytes = shard1.get("memory.total.coordinating_in_bytes");
Integer shard1PrimaryBytes = shard1.get("memory.total.primary_in_bytes");
Integer shard1ReplicaBytes = shard1.get("memory.total.replica_in_bytes");
Integer shard1CoordinatingRejections = shard1.get("rejection.coordinating.coordinating_rejections");
Integer shard1PrimaryRejections = shard1.get("rejection.primary.primary_rejections");
Integer shard1CoordinatingNodeRejections = shard1.get("rejection.coordinating.breakup.node_limits");

XContentTestUtils.JsonMapView node2 = new XContentTestUtils.JsonMapView((Map<String, Object>) values1.get(1));
ArrayList<Object> shard2IndexingPressureValues = new ArrayList<>(((Map<Object, Object>) node2.get("shard_indexing_pressure.stats"))
.values());
assertThat(shard2IndexingPressureValues.size(), equalTo(1));
XContentTestUtils.JsonMapView shard2 = new XContentTestUtils.JsonMapView((Map<String, Object>) shard2IndexingPressureValues.get(0));
Integer node2TotalLimitsRejections = node2.get("shard_indexing_pressure.total_rejections_breakup.node_limits");
Integer shard2CoordinatingBytes = shard2.get("memory.total.coordinating_in_bytes");
Integer shard2PrimaryBytes = shard2.get("memory.total.primary_in_bytes");
Integer shard2ReplicaBytes = shard2.get("memory.total.replica_in_bytes");
Integer shard2CoordinatingRejections = shard2.get("rejection.coordinating.coordinating_rejections");
Integer shard2PrimaryRejections = shard2.get("rejection.primary.primary_rejections");
Integer shard2CoordinatingNodeRejections = shard2.get("rejection.coordinating.breakup.node_limits");


if (shard1CoordinatingBytes == 0) {
assertThat(shard2CoordinatingBytes, greaterThan(0));
assertThat(shard2CoordinatingBytes, lessThan(1024));
} else {
assertThat(shard1CoordinatingBytes, greaterThan(0));
assertThat(shard1CoordinatingBytes, lessThan(1024));
}

if (shard1ReplicaBytes == 0) {
assertThat(shard1PrimaryBytes, greaterThan(0));
assertThat(shard1PrimaryBytes, lessThan(1024));

assertThat(shard2ReplicaBytes, greaterThan(0));
assertThat(shard2ReplicaBytes, lessThan(1024));
} else {
assertThat(shard2PrimaryBytes, greaterThan(0));
assertThat(shard2PrimaryBytes, lessThan(1024));

assertThat(shard2ReplicaBytes, equalTo(0));
assertThat(shard1ReplicaBytes, lessThan(1024));
}

assertThat(shard1CoordinatingRejections, equalTo(0));
assertThat(shard1PrimaryRejections, equalTo(0));
assertThat(shard2CoordinatingRejections, equalTo(0));
assertThat(shard2PrimaryRejections, equalTo(0));
assertThat(shard1CoordinatingNodeRejections, equalTo(0));
assertThat(node1TotalLimitsRejections, equalTo(0));
assertThat(node2TotalLimitsRejections, equalTo(0));

Request getNodeStats = new Request("GET", "/_nodes/stats/indexing_pressure");
final Response nodeStats = getRestClient().performRequest(getNodeStats);
Map<String, Object> nodeStatsMap = XContentHelper.convertToMap(JsonXContent.jsonXContent, nodeStats.getEntity().getContent(), true);
values1 = new ArrayList<>(((Map<Object, Object>) nodeStatsMap.get("nodes")).values());
assertThat(values1.size(), equalTo(2));
node1 = new XContentTestUtils.JsonMapView((Map<String, Object>) values1.get(0));
Integer node1CombinedBytes = node1.get("indexing_pressure.memory.total.combined_coordinating_and_primary_in_bytes");
Integer node1PrimaryBytes = node1.get("indexing_pressure.memory.total.primary_in_bytes");
Integer node1ReplicaBytes = node1.get("indexing_pressure.memory.total.replica_in_bytes");
Integer node1CoordinatingRejections = node1.get("indexing_pressure.memory.total.coordinating_rejections");
Integer node1PrimaryRejections = node1.get("indexing_pressure.memory.total.primary_rejections");
node2 = new XContentTestUtils.JsonMapView((Map<String, Object>) values1.get(1));
Integer node2IndexingBytes = node2.get("indexing_pressure.memory.total.combined_coordinating_and_primary_in_bytes");
Integer node2PrimaryBytes = node2.get("indexing_pressure.memory.total.primary_in_bytes");
Integer node2ReplicaBytes = node2.get("indexing_pressure.memory.total.replica_in_bytes");
Integer node2CoordinatingRejections = node2.get("indexing_pressure.memory.total.coordinating_rejections");
Integer node2PrimaryRejections = node2.get("indexing_pressure.memory.total.primary_rejections");

if (node1CombinedBytes == 0) {
assertThat(node2IndexingBytes, greaterThan(0));
assertThat(node2IndexingBytes, lessThan(1024));
} else {
assertThat(node1CombinedBytes, greaterThan(0));
assertThat(node1CombinedBytes, lessThan(1024));
}

if (node1ReplicaBytes == 0) {
assertThat(node1PrimaryBytes, greaterThan(0));
assertThat(node1PrimaryBytes, lessThan(1024));

assertThat(node2ReplicaBytes, greaterThan(0));
assertThat(node2ReplicaBytes, lessThan(1024));
} else {
assertThat(node2PrimaryBytes, greaterThan(0));
assertThat(node2PrimaryBytes, lessThan(1024));

assertThat(node2ReplicaBytes, equalTo(0));
assertThat(node1ReplicaBytes, lessThan(1024));
}

assertThat(node1CoordinatingRejections, equalTo(0));
assertThat(node1PrimaryRejections, equalTo(0));
assertThat(node2CoordinatingRejections, equalTo(0));
assertThat(node2PrimaryRejections, equalTo(0));

Request failedIndexingRequest = new Request("POST", "/index_name/_doc/");
String largeString = randomAlphaOfLength(10000);
failedIndexingRequest.setJsonEntity("{\"x\": " + largeString + "}");
ResponseException exception = expectThrows(ResponseException.class, () -> getRestClient().performRequest(failedIndexingRequest));
assertThat(exception.getResponse().getStatusLine().getStatusCode(), equalTo(TOO_MANY_REQUESTS.getStatus()));

Request getShardStats2 = new Request("GET", "/_nodes/stats/shard_indexing_pressure?include_all");
final Response shardStats2 = getRestClient().performRequest(getShardStats2);
Map<String, Object> shardStatsMap2 = XContentHelper.convertToMap(JsonXContent.jsonXContent, shardStats2.getEntity().getContent(),
true);
ArrayList<Object> values2 = new ArrayList<>(((Map<Object, Object>) shardStatsMap2.get("nodes")).values());
assertThat(values2.size(), equalTo(2));
XContentTestUtils.JsonMapView node1AfterRejection = new XContentTestUtils.JsonMapView((Map<String, Object>) values2.get(0));
ArrayList<Object> shard1IndexingPressureValuesAfterRejection = new ArrayList<>(((Map<Object, Object>) node1AfterRejection
.get("shard_indexing_pressure.stats")).values());
assertThat(shard1IndexingPressureValuesAfterRejection.size(), equalTo(1));
XContentTestUtils.JsonMapView shard1AfterRejection = new XContentTestUtils
.JsonMapView((Map<String, Object>) shard1IndexingPressureValuesAfterRejection.get(0));
node1TotalLimitsRejections = node1AfterRejection.get("shard_indexing_pressure.total_rejections_breakup.node_limits");
shard1CoordinatingRejections = shard1AfterRejection.get("rejection.coordinating.coordinating_rejections");
shard1PrimaryRejections = shard1AfterRejection.get("rejection.primary.primary_rejections");
shard1CoordinatingNodeRejections = shard1AfterRejection.get("rejection.coordinating.breakup.node_limits");

XContentTestUtils.JsonMapView node2AfterRejection = new XContentTestUtils.JsonMapView((Map<String, Object>) values2.get(1));
ArrayList<Object> shard2IndexingPressureValuesAfterRejection = new ArrayList<>(((Map<Object, Object>) node2AfterRejection
.get("shard_indexing_pressure.stats")).values());
assertThat(shard2IndexingPressureValuesAfterRejection.size(), equalTo(1));
XContentTestUtils.JsonMapView shard2AfterRejection = new XContentTestUtils.JsonMapView((Map<String, Object>)
shard2IndexingPressureValuesAfterRejection.get(0));
node2TotalLimitsRejections = node2AfterRejection.get("shard_indexing_pressure.total_rejections_breakup.node_limits");
shard2CoordinatingRejections = shard2AfterRejection.get("rejection.coordinating.coordinating_rejections");
shard2PrimaryRejections = shard2AfterRejection.get("rejection.primary.primary_rejections");
shard2CoordinatingNodeRejections = shard2AfterRejection.get("rejection.coordinating.breakup.node_limits");

if (shard1CoordinatingRejections == 0) {
assertThat(shard2CoordinatingRejections, equalTo(1));
assertThat(shard2CoordinatingNodeRejections, equalTo(1));
assertThat(node2TotalLimitsRejections, equalTo(1));
} else {
assertThat(shard1CoordinatingRejections, equalTo(1));
assertThat(shard1CoordinatingNodeRejections, equalTo(1));
assertThat(node1TotalLimitsRejections, equalTo(1));
}

assertThat(shard1PrimaryRejections, equalTo(0));
assertThat(shard2PrimaryRejections, equalTo(0));

Request getNodeStats2 = new Request("GET", "/_nodes/stats/indexing_pressure");
final Response nodeStats2 = getRestClient().performRequest(getNodeStats2);
Map<String, Object> nodeStatsMap2 = XContentHelper.convertToMap(JsonXContent.jsonXContent, nodeStats2.getEntity().getContent(),
true);
values2 = new ArrayList<>(((Map<Object, Object>) nodeStatsMap2.get("nodes")).values());
assertThat(values2.size(), equalTo(2));
node1AfterRejection = new XContentTestUtils.JsonMapView((Map<String, Object>) values2.get(0));
node1CoordinatingRejections = node1AfterRejection.get("indexing_pressure.memory.total.coordinating_rejections");
node1PrimaryRejections = node1.get("indexing_pressure.memory.total.primary_rejections");
node2AfterRejection = new XContentTestUtils.JsonMapView((Map<String, Object>) values2.get(1));
node2CoordinatingRejections = node2AfterRejection.get("indexing_pressure.memory.total.coordinating_rejections");
node2PrimaryRejections = node2AfterRejection.get("indexing_pressure.memory.total.primary_rejections");

if (node1CoordinatingRejections == 0) {
assertThat(node2CoordinatingRejections, equalTo(1));
} else {
assertThat(node1CoordinatingRejections, equalTo(1));
}

assertThat(node1PrimaryRejections, equalTo(0));
assertThat(node2PrimaryRejections, equalTo(0));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
import org.opensearch.script.ScriptStats;
import org.opensearch.threadpool.ThreadPoolStats;
import org.opensearch.transport.TransportStats;
import org.opensearch.index.ShardIndexingPressure;
import org.opensearch.index.stats.ShardIndexingPressureStats;

import java.io.IOException;
import java.util.Map;
Expand Down Expand Up @@ -99,6 +101,9 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment {
@Nullable
private IndexingPressureStats indexingPressureStats;

@Nullable
private ShardIndexingPressureStats shardIndexingPressureStats;

public NodeStats(StreamInput in) throws IOException {
super(in);
timestamp = in.readVLong();
Expand Down Expand Up @@ -134,6 +139,15 @@ public NodeStats(StreamInput in) throws IOException {
} else {
indexingPressureStats = null;
}
if (in.getVersion().onOrAfter(Version.V_7_10_2)) {
shardIndexingPressureStats = in.readOptionalWriteable(ShardIndexingPressureStats::new);
} else if (in.getVersion().onOrAfter(Version.V_7_9_0)) {
if (ShardIndexingPressure.isShardIndexingPressureAttributeEnabled()) {
shardIndexingPressureStats = in.readOptionalWriteable(ShardIndexingPressureStats::new);
}
} else {
shardIndexingPressureStats = null;
}
}

public NodeStats(DiscoveryNode node, long timestamp, @Nullable NodeIndicesStats indices,
Expand All @@ -145,7 +159,8 @@ public NodeStats(DiscoveryNode node, long timestamp, @Nullable NodeIndicesStats
@Nullable IngestStats ingestStats,
@Nullable AdaptiveSelectionStats adaptiveSelectionStats,
@Nullable ScriptCacheStats scriptCacheStats,
@Nullable IndexingPressureStats indexingPressureStats) {
@Nullable IndexingPressureStats indexingPressureStats,
@Nullable ShardIndexingPressureStats shardIndexingPressureStats) {
super(node);
this.timestamp = timestamp;
this.indices = indices;
Expand All @@ -163,6 +178,7 @@ public NodeStats(DiscoveryNode node, long timestamp, @Nullable NodeIndicesStats
this.adaptiveSelectionStats = adaptiveSelectionStats;
this.scriptCacheStats = scriptCacheStats;
this.indexingPressureStats = indexingPressureStats;
this.shardIndexingPressureStats = shardIndexingPressureStats;
}

public long getTimestamp() {
Expand Down Expand Up @@ -267,6 +283,11 @@ public IndexingPressureStats getIndexingPressureStats() {
return indexingPressureStats;
}

@Nullable
public ShardIndexingPressureStats getShardIndexingPressureStats() {
return shardIndexingPressureStats;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
Expand Down Expand Up @@ -296,6 +317,13 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_7_9_0)) {
out.writeOptionalWriteable(indexingPressureStats);
}
if (out.getVersion().onOrAfter(Version.V_7_10_2)) {
out.writeOptionalWriteable(shardIndexingPressureStats);
} else if (out.getVersion().onOrAfter(Version.V_7_9_0)) {
if (ShardIndexingPressure.isShardIndexingPressureAttributeEnabled()) {
out.writeOptionalWriteable(shardIndexingPressureStats);
}
}
}

@Override
Expand Down Expand Up @@ -365,6 +393,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (getIndexingPressureStats() != null) {
getIndexingPressureStats().toXContent(builder, params);
}
if (getShardIndexingPressureStats() != null) {
getShardIndexingPressureStats().toXContent(builder, params);
}
return builder;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,8 @@ public enum Metric {
INGEST("ingest"),
ADAPTIVE_SELECTION("adaptive_selection"),
SCRIPT_CACHE("script_cache"),
INDEXING_PRESSURE("indexing_pressure"),;
INDEXING_PRESSURE("indexing_pressure"),
SHARD_INDEXING_PRESSURE("shard_indexing_pressure");

private String metricName;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ protected NodeStats nodeOperation(NodeStatsRequest nodeStatsRequest) {
NodesStatsRequest.Metric.INGEST.containedIn(metrics),
NodesStatsRequest.Metric.ADAPTIVE_SELECTION.containedIn(metrics),
NodesStatsRequest.Metric.SCRIPT_CACHE.containedIn(metrics),
NodesStatsRequest.Metric.INDEXING_PRESSURE.containedIn(metrics));
NodesStatsRequest.Metric.INDEXING_PRESSURE.containedIn(metrics),
NodesStatsRequest.Metric.SHARD_INDEXING_PRESSURE.containedIn(metrics));
}

public static class NodeStatsRequest extends BaseNodeRequest {
Expand Down
Loading

0 comments on commit 657c705

Please sign in to comment.