Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add cluster manager task throttling stats in nodes stats API #5790

Merged
merged 9 commits into from
Jan 17, 2023
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Added support to apply index create block ([#4603](https://github.com/opensearch-project/OpenSearch/issues/4603))
- Support request level durability for remote-backed indexes ([#5671](https://github.com/opensearch-project/OpenSearch/issues/5671))
- Added new level to get health per awareness attribute in _cluster/health ([#5694](https://github.com/opensearch-project/OpenSearch/pull/5694))
- Added cluster manager throttling stats in nodes/stats API ([#5790](https://github.com/opensearch-project/OpenSearch/pull/5790))

### Dependencies
- Bumps `bcpg-fips` from 1.0.5.1 to 1.0.7.1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.opensearch.action.support.nodes.BaseNodeResponse;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodeRole;
import org.opensearch.cluster.service.ClusterManagerThrottlingStats;
import org.opensearch.common.Nullable;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
Expand Down Expand Up @@ -122,6 +123,9 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment {
@Nullable
private SearchBackpressureStats searchBackpressureStats;

@Nullable
private ClusterManagerThrottlingStats clusterManagerThrottlingStats;

public NodeStats(StreamInput in) throws IOException {
super(in);
timestamp = in.readVLong();
Expand Down Expand Up @@ -152,6 +156,12 @@ public NodeStats(StreamInput in) throws IOException {
} else {
searchBackpressureStats = null;
}

if (in.getVersion().onOrAfter(Version.V_2_5_0)) {
clusterManagerThrottlingStats = in.readOptionalWriteable(ClusterManagerThrottlingStats::new);
} else {
clusterManagerThrottlingStats = null;
}
dhwanilpatel marked this conversation as resolved.
Show resolved Hide resolved
}

public NodeStats(
Expand All @@ -173,7 +183,8 @@ public NodeStats(
@Nullable ScriptCacheStats scriptCacheStats,
@Nullable IndexingPressureStats indexingPressureStats,
@Nullable ShardIndexingPressureStats shardIndexingPressureStats,
@Nullable SearchBackpressureStats searchBackpressureStats
@Nullable SearchBackpressureStats searchBackpressureStats,
@Nullable ClusterManagerThrottlingStats clusterManagerThrottlingStats
) {
super(node);
this.timestamp = timestamp;
Expand All @@ -194,6 +205,7 @@ public NodeStats(
this.indexingPressureStats = indexingPressureStats;
this.shardIndexingPressureStats = shardIndexingPressureStats;
this.searchBackpressureStats = searchBackpressureStats;
this.clusterManagerThrottlingStats = clusterManagerThrottlingStats;
}

public long getTimestamp() {
Expand Down Expand Up @@ -308,6 +320,11 @@ public SearchBackpressureStats getSearchBackpressureStats() {
return searchBackpressureStats;
}

@Nullable
public ClusterManagerThrottlingStats getClusterManagerThrottlingStats() {
dhwanilpatel marked this conversation as resolved.
Show resolved Hide resolved
return clusterManagerThrottlingStats;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
Expand Down Expand Up @@ -336,6 +353,9 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_2_4_0)) {
out.writeOptionalWriteable(searchBackpressureStats);
}
if (out.getVersion().onOrAfter(Version.V_2_5_0)) {
out.writeOptionalWriteable(clusterManagerThrottlingStats);
}
}

@Override
Expand Down Expand Up @@ -411,6 +431,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (getSearchBackpressureStats() != null) {
getSearchBackpressureStats().toXContent(builder, params);
}
if (getClusterManagerThrottlingStats() != null) {
getClusterManagerThrottlingStats().toXContent(builder, params);
}
return builder;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,8 @@ public enum Metric {
SCRIPT_CACHE("script_cache"),
INDEXING_PRESSURE("indexing_pressure"),
SHARD_INDEXING_PRESSURE("shard_indexing_pressure"),
SEARCH_BACKPRESSURE("search_backpressure");
SEARCH_BACKPRESSURE("search_backpressure"),
CLUSTER_MANAGER_THROTTLING("cluster_manager_throttling");

private String metricName;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,8 @@ protected NodeStats nodeOperation(NodeStatsRequest nodeStatsRequest) {
NodesStatsRequest.Metric.SCRIPT_CACHE.containedIn(metrics),
NodesStatsRequest.Metric.INDEXING_PRESSURE.containedIn(metrics),
NodesStatsRequest.Metric.SHARD_INDEXING_PRESSURE.containedIn(metrics),
NodesStatsRequest.Metric.SEARCH_BACKPRESSURE.containedIn(metrics)
NodesStatsRequest.Metric.SEARCH_BACKPRESSURE.containedIn(metrics),
NodesStatsRequest.Metric.CLUSTER_MANAGER_THROTTLING.containedIn(metrics)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq
false,
false,
false,
false,
false
);
List<ShardStats> shardsStats = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,29 @@

package org.opensearch.cluster.service;

import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.io.stream.Writeable;
import org.opensearch.common.metrics.CounterMetric;
import org.opensearch.common.xcontent.ToXContent;
import org.opensearch.common.xcontent.ToXContentFragment;
import org.opensearch.common.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
* Contains stats of Cluster Manager Task Throttling.
* It stores the total cumulative count of throttled tasks per task type.
*/
public class ClusterManagerThrottlingStats implements ClusterManagerTaskThrottlerListener {
public class ClusterManagerThrottlingStats implements ClusterManagerTaskThrottlerListener, Writeable, ToXContentFragment {

private Map<String, CounterMetric> throttledTasksCount = new ConcurrentHashMap<>();
private Map<String, CounterMetric> throttledTasksCount;

public ClusterManagerThrottlingStats() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might need additional metrics on throttle_time

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please elaborate more on this?
What we are expecting out of throttle_time metric?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

time since we last throttled

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Time since cluster manager performed has done throttling for any task? Or for different task type as well?

I think we can achieve it, as we can maintain time as well when we have performed last throttling on cluster manager node.

I am just wondering how this additional metric will help?

throttledTasksCount = new ConcurrentHashMap<>();
}

private void incrementThrottlingCount(String type, final int counts) {
throttledTasksCount.computeIfAbsent(type, k -> new CounterMetric()).inc(counts);
Expand All @@ -39,4 +50,38 @@ public long getTotalThrottledTaskCount() {
public void onThrottle(String type, int counts) {
incrementThrottlingCount(type, counts);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeInt(throttledTasksCount.size());
for (Map.Entry<String, CounterMetric> entry : throttledTasksCount.entrySet()) {
out.writeString(entry.getKey());
out.writeLong(entry.getValue().count());
}
}

public ClusterManagerThrottlingStats(StreamInput in) throws IOException {
int throttledTaskEntries = in.readInt();
throttledTasksCount = new ConcurrentHashMap<>();
for (int i = 0; i < throttledTaskEntries; i++) {
String taskType = in.readString();
long throttledTaskCount = in.readLong();
onThrottle(taskType, (int) throttledTaskCount);
}
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
builder.startObject("cluster_manager_throttling");
builder.startObject("cluster_manager_stats");
dhwanilpatel marked this conversation as resolved.
Show resolved Hide resolved
builder.field("TotalThrottledTasks", getTotalThrottledTaskCount());
builder.startObject("ThrottledTasksPerTaskType");
dhwanilpatel marked this conversation as resolved.
Show resolved Hide resolved
for (Map.Entry<String, CounterMetric> entry : throttledTasksCount.entrySet()) {
builder.field(entry.getKey(), entry.getValue().count());
}
builder.endObject();
builder.endObject();
return builder.endObject();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -606,6 +606,13 @@ public long numberOfThrottledPendingTasks() {
return throttlingStats.getTotalThrottledTaskCount();
}

/**
* Returns the stats of throttled pending tasks.
*/
public ClusterManagerThrottlingStats getThrottlingStats() {
return throttlingStats;
}

/**
* Returns the min version of nodes in cluster
*/
Expand Down
8 changes: 6 additions & 2 deletions server/src/main/java/org/opensearch/node/NodeService.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ public class NodeService implements Closeable {
private final IndexingPressureService indexingPressureService;
private final AggregationUsageService aggregationUsageService;
private final SearchBackpressureService searchBackpressureService;
private final ClusterService clusterService;

private final Discovery discovery;

Expand Down Expand Up @@ -123,6 +124,7 @@ public class NodeService implements Closeable {
this.indexingPressureService = indexingPressureService;
this.aggregationUsageService = aggregationUsageService;
this.searchBackpressureService = searchBackpressureService;
this.clusterService = clusterService;
clusterService.addStateApplier(ingestService);
}

Expand Down Expand Up @@ -174,7 +176,8 @@ public NodeStats stats(
boolean scriptCache,
boolean indexingPressure,
boolean shardIndexingPressure,
boolean searchBackpressure
boolean searchBackpressure,
boolean clusterManagerThrottling
) {
// for indices stats we want to include previous allocated shards stats as well (it will
// only be applied to the sensible ones to use, like refresh/merge/flush/indexing stats)
Expand All @@ -197,7 +200,8 @@ public NodeStats stats(
scriptCache ? scriptService.cacheStats() : null,
indexingPressure ? this.indexingPressureService.nodeStats() : null,
shardIndexingPressure ? this.indexingPressureService.shardStats(indices) : null,
searchBackpressure ? this.searchBackpressureService.nodeStats() : null
searchBackpressure ? this.searchBackpressureService.nodeStats() : null,
clusterManagerThrottling ? this.clusterService.getClusterManagerService().getThrottlingStats() : null
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ List<NodeStats> adjustNodesStats(List<NodeStats> nodesStats) {
nodeStats.getScriptCacheStats(),
nodeStats.getIndexingPressureStats(),
nodeStats.getShardIndexingPressureStats(),
nodeStats.getSearchBackpressureStats()
nodeStats.getSearchBackpressureStats(),
nodeStats.getClusterManagerThrottlingStats()
);
}).collect(Collectors.toList());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2686,6 +2686,7 @@ public void ensureEstimatedStats() {
false,
false,
false,
false,
false
);
assertThat(
Expand Down