Skip to content

Commit

Permalink
[Backport 2.x] Added search backpressure stats API
Browse files Browse the repository at this point in the history
Added search backpressure stats to the existing node/stats API to describe:
1. the number of cancellations (currently for SearchShardTask only)
2. the current state of TaskResourceUsageTracker

Signed-off-by: Ketan Verma <ketan9495@gmail.com>
  • Loading branch information
ketanv3 committed Nov 2, 2022
1 parent 72cafde commit 94feab9
Show file tree
Hide file tree
Showing 29 changed files with 857 additions and 55 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- Use getParameterCount instead of getParameterTypes ([#4821](https://github.com/opensearch-project/OpenSearch/pull/4821))
- Added in-flight cancellation of SearchShardTask based on resource consumption ([#4565](https://github.com/opensearch-project/OpenSearch/pull/4565))
- Added resource usage trackers for in-flight cancellation of SearchShardTask ([#4805](https://github.com/opensearch-project/OpenSearch/pull/4805))
- Added search backpressure stats API ([#4932](https://github.com/opensearch-project/OpenSearch/pull/4932))

### Dependencies
- Bumps `com.diffplug.spotless` from 6.9.1 to 6.10.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.opensearch.node.AdaptiveSelectionStats;
import org.opensearch.script.ScriptCacheStats;
import org.opensearch.script.ScriptStats;
import org.opensearch.search.backpressure.stats.SearchBackpressureStats;
import org.opensearch.threadpool.ThreadPoolStats;
import org.opensearch.transport.TransportStats;

Expand Down Expand Up @@ -119,6 +120,9 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment {
@Nullable
private ShardIndexingPressureStats shardIndexingPressureStats;

@Nullable
private SearchBackpressureStats searchBackpressureStats;

public NodeStats(StreamInput in) throws IOException {
super(in);
timestamp = in.readVLong();
Expand Down Expand Up @@ -156,6 +160,11 @@ public NodeStats(StreamInput in) throws IOException {
shardIndexingPressureStats = null;
}

if (in.getVersion().onOrAfter(Version.V_2_4_0)) {
searchBackpressureStats = in.readOptionalWriteable(SearchBackpressureStats::new);
} else {
searchBackpressureStats = null;
}
}

public NodeStats(
Expand All @@ -176,7 +185,8 @@ public NodeStats(
@Nullable AdaptiveSelectionStats adaptiveSelectionStats,
@Nullable ScriptCacheStats scriptCacheStats,
@Nullable IndexingPressureStats indexingPressureStats,
@Nullable ShardIndexingPressureStats shardIndexingPressureStats
@Nullable ShardIndexingPressureStats shardIndexingPressureStats,
@Nullable SearchBackpressureStats searchBackpressureStats
) {
super(node);
this.timestamp = timestamp;
Expand All @@ -196,6 +206,7 @@ public NodeStats(
this.scriptCacheStats = scriptCacheStats;
this.indexingPressureStats = indexingPressureStats;
this.shardIndexingPressureStats = shardIndexingPressureStats;
this.searchBackpressureStats = searchBackpressureStats;
}

public long getTimestamp() {
Expand Down Expand Up @@ -305,6 +316,11 @@ public ShardIndexingPressureStats getShardIndexingPressureStats() {
return shardIndexingPressureStats;
}

@Nullable
public SearchBackpressureStats getSearchBackpressureStats() {
return searchBackpressureStats;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
Expand Down Expand Up @@ -336,6 +352,9 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_1_2_0)) {
out.writeOptionalWriteable(shardIndexingPressureStats);
}
if (out.getVersion().onOrAfter(Version.V_2_4_0)) {
out.writeOptionalWriteable(searchBackpressureStats);
}
}

@Override
Expand Down Expand Up @@ -408,6 +427,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (getShardIndexingPressureStats() != null) {
getShardIndexingPressureStats().toXContent(builder, params);
}
if (getSearchBackpressureStats() != null) {
getSearchBackpressureStats().toXContent(builder, params);
}
return builder;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,8 @@ public enum Metric {
ADAPTIVE_SELECTION("adaptive_selection"),
SCRIPT_CACHE("script_cache"),
INDEXING_PRESSURE("indexing_pressure"),
SHARD_INDEXING_PRESSURE("shard_indexing_pressure");
SHARD_INDEXING_PRESSURE("shard_indexing_pressure"),
SEARCH_BACKPRESSURE("search_backpressure");

private String metricName;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ protected NodeStats nodeOperation(NodeStatsRequest nodeStatsRequest) {
NodesStatsRequest.Metric.ADAPTIVE_SELECTION.containedIn(metrics),
NodesStatsRequest.Metric.SCRIPT_CACHE.containedIn(metrics),
NodesStatsRequest.Metric.INDEXING_PRESSURE.containedIn(metrics),
NodesStatsRequest.Metric.SHARD_INDEXING_PRESSURE.containedIn(metrics)
NodesStatsRequest.Metric.SHARD_INDEXING_PRESSURE.containedIn(metrics),
NodesStatsRequest.Metric.SEARCH_BACKPRESSURE.containedIn(metrics)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq
false,
false,
false,
false,
false
);
List<ShardStats> shardsStats = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -591,8 +591,7 @@ public void apply(Settings value, Settings current, Settings previous) {
TaskManager.TASK_RESOURCE_CONSUMERS_ENABLED,

// Settings related to search backpressure
SearchBackpressureSettings.SETTING_ENABLED,
SearchBackpressureSettings.SETTING_ENFORCED,
SearchBackpressureSettings.SETTING_MODE,
SearchBackpressureSettings.SETTING_CANCELLATION_RATIO,
SearchBackpressureSettings.SETTING_CANCELLATION_RATE,
SearchBackpressureSettings.SETTING_CANCELLATION_BURST,
Expand Down
6 changes: 4 additions & 2 deletions server/src/main/java/org/opensearch/node/NodeService.java
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,8 @@ public NodeStats stats(
boolean adaptiveSelection,
boolean scriptCache,
boolean indexingPressure,
boolean shardIndexingPressure
boolean shardIndexingPressure,
boolean searchBackpressure
) {
// for indices stats we want to include previous allocated shards stats as well (it will
// only be applied to the sensible ones to use, like refresh/merge/flush/indexing stats)
Expand All @@ -195,7 +196,8 @@ public NodeStats stats(
adaptiveSelection ? responseCollectorService.getAdaptiveStats(searchTransportService.getPendingSearchRequests()) : null,
scriptCache ? scriptService.cacheStats() : null,
indexingPressure ? this.indexingPressureService.nodeStats() : null,
shardIndexingPressure ? this.indexingPressureService.shardStats(indices) : null
shardIndexingPressure ? this.indexingPressureService.shardStats(indices) : null,
searchBackpressure ? this.searchBackpressureService.nodeStats() : null
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,17 @@
import org.opensearch.common.util.TokenBucket;
import org.opensearch.monitor.jvm.JvmStats;
import org.opensearch.monitor.process.ProcessProbe;
import org.opensearch.search.backpressure.settings.SearchBackpressureMode;
import org.opensearch.search.backpressure.settings.SearchBackpressureSettings;
import org.opensearch.search.backpressure.stats.CancelledTaskStats;
import org.opensearch.search.backpressure.stats.SearchBackpressureStats;
import org.opensearch.search.backpressure.stats.SearchShardTaskStats;
import org.opensearch.search.backpressure.trackers.CpuUsageTracker;
import org.opensearch.search.backpressure.trackers.ElapsedTimeTracker;
import org.opensearch.search.backpressure.trackers.HeapUsageTracker;
import org.opensearch.search.backpressure.trackers.NodeDuressTracker;
import org.opensearch.search.backpressure.trackers.TaskResourceUsageTracker;
import org.opensearch.search.backpressure.trackers.TaskResourceUsageTrackerType;
import org.opensearch.tasks.CancellableTask;
import org.opensearch.tasks.Task;
import org.opensearch.tasks.TaskCancellation;
Expand Down Expand Up @@ -117,7 +122,8 @@ public SearchBackpressureService(
}

void doRun() {
if (getSettings().isEnabled() == false) {
SearchBackpressureMode mode = getSettings().getMode();
if (mode == SearchBackpressureMode.DISABLED) {
return;
}

Expand All @@ -126,7 +132,7 @@ void doRun() {
}

// We are only targeting in-flight cancellation of SearchShardTask for now.
List<CancellableTask> searchShardTasks = getSearchShardTasks();
List<SearchShardTask> searchShardTasks = getSearchShardTasks();

// Force-refresh usage stats of these tasks before making a cancellation decision.
taskResourceTrackingService.refreshResourceStats(searchShardTasks.toArray(new Task[0]));
Expand All @@ -143,7 +149,7 @@ void doRun() {
taskCancellation.getReasonString()
);

if (getSettings().isEnforced() == false) {
if (mode != SearchBackpressureMode.ENFORCED) {
continue;
}

Expand All @@ -159,7 +165,6 @@ void doRun() {
}

taskCancellation.cancel();
state.incrementCancellationCount();
}
}

Expand All @@ -182,7 +187,7 @@ boolean isNodeInDuress() {
/**
* Returns true if the increase in heap usage is due to search requests.
*/
boolean isHeapUsageDominatedBySearch(List<CancellableTask> searchShardTasks) {
boolean isHeapUsageDominatedBySearch(List<SearchShardTask> searchShardTasks) {
long usage = searchShardTasks.stream().mapToLong(task -> task.getTotalResourceStats().getMemoryInBytes()).sum();
long threshold = getSettings().getSearchShardTaskSettings().getTotalHeapBytesThreshold();
if (usage < threshold) {
Expand All @@ -196,12 +201,12 @@ boolean isHeapUsageDominatedBySearch(List<CancellableTask> searchShardTasks) {
/**
* Filters and returns the list of currently running SearchShardTasks.
*/
List<CancellableTask> getSearchShardTasks() {
List<SearchShardTask> getSearchShardTasks() {
return taskResourceTrackingService.getResourceAwareTasks()
.values()
.stream()
.filter(task -> task instanceof SearchShardTask)
.map(task -> (CancellableTask) task)
.map(task -> (SearchShardTask) task)
.collect(Collectors.toUnmodifiableList());
}

Expand All @@ -222,13 +227,18 @@ TaskCancellation getTaskCancellation(CancellableTask task) {
}
}

if (task instanceof SearchShardTask) {
callbacks.add(state::incrementCancellationCount);
callbacks.add(() -> state.setLastCancelledTaskStats(CancelledTaskStats.from(task, timeNanosSupplier)));
}

return new TaskCancellation(task, reasons, callbacks);
}

/**
* Returns a list of TaskCancellations sorted by descending order of their cancellation scores.
*/
List<TaskCancellation> getTaskCancellations(List<CancellableTask> tasks) {
List<TaskCancellation> getTaskCancellations(List<? extends CancellableTask> tasks) {
return tasks.stream()
.map(this::getTaskCancellation)
.filter(TaskCancellation::isEligibleForCancellation)
Expand All @@ -246,7 +256,7 @@ SearchBackpressureState getState() {

@Override
public void onTaskCompleted(Task task) {
if (getSettings().isEnabled() == false) {
if (getSettings().getMode() == SearchBackpressureMode.DISABLED) {
return;
}

Expand Down Expand Up @@ -310,4 +320,18 @@ protected void doStop() {

@Override
protected void doClose() throws IOException {}

public SearchBackpressureStats nodeStats() {
List<SearchShardTask> searchShardTasks = getSearchShardTasks();

SearchShardTaskStats searchShardTaskStats = new SearchShardTaskStats(
state.getCancellationCount(),
state.getLimitReachedCount(),
state.getLastCancelledTaskStats(),
taskResourceUsageTrackers.stream()
.collect(Collectors.toUnmodifiableMap(t -> TaskResourceUsageTrackerType.fromName(t.name()), t -> t.stats(searchShardTasks)))
);

return new SearchBackpressureStats(searchShardTaskStats, getSettings().getMode());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@

package org.opensearch.search.backpressure;

import org.opensearch.search.backpressure.stats.CancelledTaskStats;

import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

/**
* Tracks the current state of task completions and cancellations.
Expand All @@ -31,6 +34,11 @@ public class SearchBackpressureState {
*/
private final AtomicLong limitReachedCount = new AtomicLong();

/**
* Usage stats for the last cancelled task.
*/
private final AtomicReference<CancelledTaskStats> lastCancelledTaskStats = new AtomicReference<>();

public long getCompletionCount() {
return completionCount.get();
}
Expand All @@ -54,4 +62,12 @@ public long getLimitReachedCount() {
long incrementLimitReachedCount() {
return limitReachedCount.incrementAndGet();
}

public CancelledTaskStats getLastCancelledTaskStats() {
return lastCancelledTaskStats.get();
}

public void setLastCancelledTaskStats(CancelledTaskStats stats) {
lastCancelledTaskStats.set(stats);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search.backpressure.settings;

/**
* Defines the search backpressure mode.
*/
public enum SearchBackpressureMode {
/**
* SearchBackpressureService is completely disabled.
*/
DISABLED("disabled"),

/**
* SearchBackpressureService only monitors the resource usage of running tasks.
*/
MONITOR_ONLY("monitor_only"),

/**
* SearchBackpressureService monitors and rejects tasks that exceed resource usage thresholds.
*/
ENFORCED("enforced");

private final String name;

SearchBackpressureMode(String name) {
this.name = name;
}

public String getName() {
return name;
}

public static SearchBackpressureMode fromName(String name) {
switch (name) {
case "disabled":
return DISABLED;
case "monitor_only":
return MONITOR_ONLY;
case "enforced":
return ENFORCED;
}

throw new IllegalArgumentException("invalid SearchBackpressureMode: " + name);
}
}
Loading

0 comments on commit 94feab9

Please sign in to comment.