Skip to content

Commit

Permalink
Add shard indexing pressure metric/stats via rest end point. (#1171)
Browse files Browse the repository at this point in the history
* Add shard indexing pressure metric/stats via rest end point.

Signed-off-by: Saurabh Singh <sisurab@amazon.com>
  • Loading branch information
getsaurabh02 authored and adnapibar committed Sep 15, 2021
1 parent 6e310b1 commit 700a5f5
Show file tree
Hide file tree
Showing 12 changed files with 352 additions and 15 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@
import org.opensearch.common.xcontent.XContentBuilder;
import org.opensearch.discovery.DiscoveryStats;
import org.opensearch.http.HttpStats;
import org.opensearch.index.ShardIndexingPressureSettings;
import org.opensearch.index.stats.IndexingPressureStats;
import org.opensearch.index.stats.ShardIndexingPressureStats;
import org.opensearch.indices.NodeIndicesStats;
import org.opensearch.indices.breaker.AllCircuitBreakerStats;
import org.opensearch.ingest.IngestStats;
Expand Down Expand Up @@ -112,6 +114,9 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment {
@Nullable
private IndexingPressureStats indexingPressureStats;

@Nullable
private ShardIndexingPressureStats shardIndexingPressureStats;

public NodeStats(StreamInput in) throws IOException {
super(in);
timestamp = in.readVLong();
Expand Down Expand Up @@ -147,6 +152,12 @@ public NodeStats(StreamInput in) throws IOException {
} else {
indexingPressureStats = null;
}
if (ShardIndexingPressureSettings.isShardIndexingPressureAttributeEnabled()) {
shardIndexingPressureStats = in.readOptionalWriteable(ShardIndexingPressureStats::new);
} else {
shardIndexingPressureStats = null;
}

}

public NodeStats(DiscoveryNode node, long timestamp, @Nullable NodeIndicesStats indices,
Expand All @@ -158,7 +169,8 @@ public NodeStats(DiscoveryNode node, long timestamp, @Nullable NodeIndicesStats
@Nullable IngestStats ingestStats,
@Nullable AdaptiveSelectionStats adaptiveSelectionStats,
@Nullable ScriptCacheStats scriptCacheStats,
@Nullable IndexingPressureStats indexingPressureStats) {
@Nullable IndexingPressureStats indexingPressureStats,
@Nullable ShardIndexingPressureStats shardIndexingPressureStats) {
super(node);
this.timestamp = timestamp;
this.indices = indices;
Expand All @@ -176,6 +188,7 @@ public NodeStats(DiscoveryNode node, long timestamp, @Nullable NodeIndicesStats
this.adaptiveSelectionStats = adaptiveSelectionStats;
this.scriptCacheStats = scriptCacheStats;
this.indexingPressureStats = indexingPressureStats;
this.shardIndexingPressureStats = shardIndexingPressureStats;
}

public long getTimestamp() {
Expand Down Expand Up @@ -280,6 +293,11 @@ public IndexingPressureStats getIndexingPressureStats() {
return indexingPressureStats;
}

@Nullable
public ShardIndexingPressureStats getShardIndexingPressureStats() {
return shardIndexingPressureStats;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
Expand Down Expand Up @@ -309,6 +327,9 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(LegacyESVersion.V_7_9_0)) {
out.writeOptionalWriteable(indexingPressureStats);
}
if (ShardIndexingPressureSettings.isShardIndexingPressureAttributeEnabled()) {
out.writeOptionalWriteable(shardIndexingPressureStats);
}
}

@Override
Expand Down Expand Up @@ -378,6 +399,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (getIndexingPressureStats() != null) {
getIndexingPressureStats().toXContent(builder, params);
}
if (getShardIndexingPressureStats() != null) {
getShardIndexingPressureStats().toXContent(builder, params);
}
return builder;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,8 @@ public enum Metric {
INGEST("ingest"),
ADAPTIVE_SELECTION("adaptive_selection"),
SCRIPT_CACHE("script_cache"),
INDEXING_PRESSURE("indexing_pressure"),;
INDEXING_PRESSURE("indexing_pressure"),
SHARD_INDEXING_PRESSURE("shard_indexing_pressure");

private String metricName;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ protected NodeStats nodeOperation(NodeStatsRequest nodeStatsRequest) {
NodesStatsRequest.Metric.INGEST.containedIn(metrics),
NodesStatsRequest.Metric.ADAPTIVE_SELECTION.containedIn(metrics),
NodesStatsRequest.Metric.SCRIPT_CACHE.containedIn(metrics),
NodesStatsRequest.Metric.INDEXING_PRESSURE.containedIn(metrics));
NodesStatsRequest.Metric.INDEXING_PRESSURE.containedIn(metrics),
NodesStatsRequest.Metric.SHARD_INDEXING_PRESSURE.containedIn(metrics));
}

public static class NodeStatsRequest extends BaseNodeRequest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ protected ClusterStatsNodeResponse newNodeResponse(StreamInput in) throws IOExce
protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeRequest) {
NodeInfo nodeInfo = nodeService.info(true, true, false, true, false, true, false, true, false, false, false);
NodeStats nodeStats = nodeService.stats(CommonStatsFlags.NONE,
true, true, true, false, true, false, false, false, false, false, true, false, false, false);
true, true, true, false, true, false, false, false, false, false, true, false, false, false, false);
List<ShardStats> shardsStats = new ArrayList<>();
for (IndexService indexService : indicesService) {
for (IndexShard indexShard : indexService) {
Expand Down
6 changes: 6 additions & 0 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@
import java.util.stream.Stream;

import static java.util.stream.Collectors.toList;
import static org.opensearch.index.ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENABLED_ATTRIBUTE_KEY;

/**
* A node represent a node within a cluster ({@code cluster.name}). The {@link #client()} can be used
Expand Down Expand Up @@ -317,6 +318,11 @@ protected Node(final Environment initialEnvironment,
Settings tmpSettings = Settings.builder().put(initialEnvironment.settings())
.put(Client.CLIENT_TYPE_SETTING_S.getKey(), CLIENT_TYPE).build();

// Enabling shard indexing backpressure node-attribute
tmpSettings = Settings.builder().put(tmpSettings)
.put(NODE_ATTRIBUTES.getKey() + SHARD_INDEXING_PRESSURE_ENABLED_ATTRIBUTE_KEY, "true")
.build();

final JvmInfo jvmInfo = JvmInfo.jvmInfo();
logger.info(
"version[{}], pid[{}], build[{}/{}/{}], OS[{}/{}/{}], JVM[{}/{}/{}/{}]",
Expand Down
5 changes: 3 additions & 2 deletions server/src/main/java/org/opensearch/node/NodeService.java
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public NodeInfo info(boolean settings, boolean os, boolean process, boolean jvm,
public NodeStats stats(CommonStatsFlags indices, boolean os, boolean process, boolean jvm, boolean threadPool,
boolean fs, boolean transport, boolean http, boolean circuitBreaker,
boolean script, boolean discoveryStats, boolean ingest, boolean adaptiveSelection, boolean scriptCache,
boolean indexingPressure) {
boolean indexingPressure, boolean shardIndexingPressure) {
// for indices stats we want to include previous allocated shards stats as well (it will
// only be applied to the sensible ones to use, like refresh/merge/flush/indexing stats)
return new NodeStats(transportService.getLocalNode(), System.currentTimeMillis(),
Expand All @@ -143,7 +143,8 @@ public NodeStats stats(CommonStatsFlags indices, boolean os, boolean process, bo
ingest ? ingestService.stats() : null,
adaptiveSelection ? responseCollectorService.getAdaptiveStats(searchTransportService.getPendingSearchRequests()) : null,
scriptCache ? scriptService.cacheStats() : null,
indexingPressure ? this.indexingPressureService.nodeStats() : null
indexingPressure ? this.indexingPressureService.nodeStats() : null,
shardIndexingPressure ? this.indexingPressureService.shardStats(indices) : null
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,13 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
if (nodesStatsRequest.indices().isSet(Flag.Segments)) {
nodesStatsRequest.indices().includeSegmentFileSizes(request.paramAsBoolean("include_segment_file_sizes", false));
}
if (request.hasParam("include_all")) {
nodesStatsRequest.indices().includeAllShardIndexingPressureTrackers(request.paramAsBoolean("include_all", false));
}

if (request.hasParam("top")) {
nodesStatsRequest.indices().includeOnlyTopIndexingPressureMetrics(request.paramAsBoolean("top", false));
}

return channel -> client.admin().cluster().nodesStats(nodesStatsRequest, new NodesResponseRestListener<>(channel));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,7 @@ public static NodeStats createNodeStats() {
//TODO NodeIndicesStats are not tested here, way too complicated to create, also they need to be migrated to Writeable yet
return new NodeStats(node, randomNonNegativeLong(), null, osStats, processStats, jvmStats, threadPoolStats,
fsInfo, transportStats, httpStats, allCircuitBreakerStats, scriptStats, discoveryStats,
ingestStats, adaptiveSelectionStats, scriptCacheStats, null);
ingestStats, adaptiveSelectionStats, scriptCacheStats, null, null);
}

private IngestStats.Stats getPipelineStats(List<IngestStats.PipelineStat> pipelineStats, String id) {
Expand Down
12 changes: 6 additions & 6 deletions server/src/test/java/org/opensearch/cluster/DiskUsageTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -166,13 +166,13 @@ public void testFillDiskUsage() {
List<NodeStats> nodeStats = Arrays.asList(
new NodeStats(new DiscoveryNode("node_1", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT), 0,
null,null,null,null,null,new FsInfo(0, null, node1FSInfo), null,null,null,null,null, null, null,
null, null),
null, null, null),
new NodeStats(new DiscoveryNode("node_2", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT), 0,
null,null,null,null,null, new FsInfo(0, null, node2FSInfo), null,null,null,null,null, null, null,
null, null),
null, null, null),
new NodeStats(new DiscoveryNode("node_3", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT), 0,
null,null,null,null,null, new FsInfo(0, null, node3FSInfo), null,null,null,null,null, null, null,
null, null)
null, null, null)
);
InternalClusterInfoService.fillDiskUsagePerNode(logger, nodeStats, newLeastAvaiableUsages, newMostAvaiableUsages);
DiskUsage leastNode_1 = newLeastAvaiableUsages.get("node_1");
Expand Down Expand Up @@ -210,13 +210,13 @@ public void testFillDiskUsageSomeInvalidValues() {
List<NodeStats> nodeStats = Arrays.asList(
new NodeStats(new DiscoveryNode("node_1", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT), 0,
null,null,null,null,null,new FsInfo(0, null, node1FSInfo), null,null,null,null,null, null, null,
null, null),
null, null, null),
new NodeStats(new DiscoveryNode("node_2", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT), 0,
null,null,null,null,null, new FsInfo(0, null, node2FSInfo), null,null,null,null,null, null, null,
null, null),
null, null, null),
new NodeStats(new DiscoveryNode("node_3", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT), 0,
null,null,null,null,null, new FsInfo(0, null, node3FSInfo), null,null,null,null,null, null, null,
null, null)
null, null, null)
);
InternalClusterInfoService.fillDiskUsagePerNode(logger, nodeStats, newLeastAvailableUsages, newMostAvailableUsages);
DiskUsage leastNode_1 = newLeastAvailableUsages.get("node_1");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ List<NodeStats> adjustNodesStats(List<NodeStats> nodesStats) {
.toArray(FsInfo.Path[]::new)), nodeStats.getTransport(),
nodeStats.getHttp(), nodeStats.getBreaker(), nodeStats.getScriptStats(), nodeStats.getDiscoveryStats(),
nodeStats.getIngestStats(), nodeStats.getAdaptiveSelectionStats(), nodeStats.getScriptCacheStats(),
nodeStats.getIndexingPressureStats());
nodeStats.getIndexingPressureStats(), nodeStats.getShardIndexingPressureStats());
}).collect(Collectors.toList());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2546,7 +2546,7 @@ public void ensureEstimatedStats() {
NodeService nodeService = getInstanceFromNode(NodeService.class, nodeAndClient.node);
CommonStatsFlags flags = new CommonStatsFlags(Flag.FieldData, Flag.QueryCache, Flag.Segments);
NodeStats stats = nodeService.stats(flags,
false, false, false, false, false, false, false, false, false, false, false, false, false, false);
false, false, false, false, false, false, false, false, false, false, false, false, false, false, false);
assertThat("Fielddata size must be 0 on node: " + stats.getNode(),
stats.getIndices().getFieldData().getMemorySizeInBytes(), equalTo(0L));
assertThat("Query cache size must be 0 on node: " + stats.getNode(),
Expand Down

0 comments on commit 700a5f5

Please sign in to comment.