Skip to content

Commit

Permalink
Bug/sbp cancellation (#13474)
Browse files Browse the repository at this point in the history
* change cancellation logic to fix disparity bw trackers and resource duress

Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>

* add additional tests for searchBackpressureService and refactor code

Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>

* add enumMap instead of list for tracking taskResourceUsageTrackets

Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>

* add nodeNotInDuress test for nodeDuressTrackers class

Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>

* address comments

Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>

* add entry in CHANGELOG

Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>

* address comments

Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>

* address comments

Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>

* remove wildcard import

Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>

* streamline imports

Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>

* address comments

Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>

* add additional test case to test the circuit breaker for SBP logic

Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>

* add missing javadoc to resourece type enum

Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>

* add javadoc to a method

Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>

* fix javadoc warnings

Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>

* fix javadoc warnings

Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>

---------

Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>
  • Loading branch information
kaushalmahi12 authored Jun 21, 2024
1 parent 568c193 commit bcccedb
Show file tree
Hide file tree
Showing 23 changed files with 915 additions and 335 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Removed

### Fixed
- Fix bug in SBP cancellation logic ([#13259](https://github.com/opensearch-project/OpenSearch/pull/13474))
- Fix handling of Short and Byte data types in ScriptProcessor ingest pipeline ([#14379](https://github.com/opensearch-project/OpenSearch/issues/14379))
- Switch to iterative version of WKT format parser ([#14086](https://github.com/opensearch-project/OpenSearch/pull/14086))
- Fix the computed max shards of cluster to avoid int overflow ([#14155](https://github.com/opensearch-project/OpenSearch/pull/14155))
Expand Down
41 changes: 41 additions & 0 deletions server/src/main/java/org/opensearch/search/ResourceType.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search;

/**
* Enum to hold the resource type
*/
public enum ResourceType {
CPU("cpu"),
JVM("jvm");

private final String name;

ResourceType(String name) {
this.name = name;
}

/**
* The string match here is case-sensitive
* @param s name matching the resource type name
* @return a {@link ResourceType}
*/
public static ResourceType fromName(String s) {
for (ResourceType resourceType : values()) {
if (resourceType.getName().equals(s)) {
return resourceType;
}
}
throw new IllegalArgumentException("Unknown resource type: [" + s + "]");
}

private String getName() {
return name;
}
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
import org.opensearch.search.backpressure.trackers.CpuUsageTracker;
import org.opensearch.search.backpressure.trackers.ElapsedTimeTracker;
import org.opensearch.search.backpressure.trackers.HeapUsageTracker;
import org.opensearch.search.backpressure.trackers.TaskResourceUsageTracker;
import org.opensearch.search.backpressure.trackers.TaskResourceUsageTrackerType;
import org.opensearch.search.backpressure.trackers.TaskResourceUsageTrackers.TaskResourceUsageTracker;

import java.io.IOException;
import java.util.Map;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
import org.opensearch.search.backpressure.trackers.CpuUsageTracker;
import org.opensearch.search.backpressure.trackers.ElapsedTimeTracker;
import org.opensearch.search.backpressure.trackers.HeapUsageTracker;
import org.opensearch.search.backpressure.trackers.TaskResourceUsageTracker;
import org.opensearch.search.backpressure.trackers.TaskResourceUsageTrackerType;
import org.opensearch.search.backpressure.trackers.TaskResourceUsageTrackers.TaskResourceUsageTracker;

import java.io.IOException;
import java.util.Map;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.search.backpressure.trackers.TaskResourceUsageTrackers.TaskResourceUsageTracker;
import org.opensearch.tasks.Task;
import org.opensearch.tasks.TaskCancellation;

Expand All @@ -34,35 +35,37 @@ public class CpuUsageTracker extends TaskResourceUsageTracker {
private final LongSupplier thresholdSupplier;

public CpuUsageTracker(LongSupplier thresholdSupplier) {
this(thresholdSupplier, (task) -> {
long usage = task.getTotalResourceStats().getCpuTimeInNanos();
long threshold = thresholdSupplier.getAsLong();

if (usage < threshold) {
return Optional.empty();
}

return Optional.of(
new TaskCancellation.Reason(
"cpu usage exceeded ["
+ new TimeValue(usage, TimeUnit.NANOSECONDS)
+ " >= "
+ new TimeValue(threshold, TimeUnit.NANOSECONDS)
+ "]",
1 // TODO: fine-tune the cancellation score/weight
)
);
});
}

public CpuUsageTracker(LongSupplier thresholdSupplier, ResourceUsageBreachEvaluator resourceUsageBreachEvaluator) {
this.thresholdSupplier = thresholdSupplier;
this.resourceUsageBreachEvaluator = resourceUsageBreachEvaluator;
}

@Override
public String name() {
return CPU_USAGE_TRACKER.getName();
}

@Override
public Optional<TaskCancellation.Reason> checkAndMaybeGetCancellationReason(Task task) {
long usage = task.getTotalResourceStats().getCpuTimeInNanos();
long threshold = thresholdSupplier.getAsLong();

if (usage < threshold) {
return Optional.empty();
}

return Optional.of(
new TaskCancellation.Reason(
"cpu usage exceeded ["
+ new TimeValue(usage, TimeUnit.NANOSECONDS)
+ " >= "
+ new TimeValue(threshold, TimeUnit.NANOSECONDS)
+ "]",
1 // TODO: fine-tune the cancellation score/weight
)
);
}

@Override
public TaskResourceUsageTracker.Stats stats(List<? extends Task> activeTasks) {
long currentMax = activeTasks.stream().mapToLong(t -> t.getTotalResourceStats().getCpuTimeInNanos()).max().orElse(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.search.backpressure.trackers.TaskResourceUsageTrackers.TaskResourceUsageTracker;
import org.opensearch.tasks.Task;
import org.opensearch.tasks.TaskCancellation;

Expand All @@ -34,36 +35,42 @@ public class ElapsedTimeTracker extends TaskResourceUsageTracker {
private final LongSupplier timeNanosSupplier;

public ElapsedTimeTracker(LongSupplier thresholdSupplier, LongSupplier timeNanosSupplier) {
this(thresholdSupplier, timeNanosSupplier, (Task task) -> {
long usage = timeNanosSupplier.getAsLong() - task.getStartTimeNanos();
long threshold = thresholdSupplier.getAsLong();

if (usage < threshold) {
return Optional.empty();
}

return Optional.of(
new TaskCancellation.Reason(
"elapsed time exceeded ["
+ new TimeValue(usage, TimeUnit.NANOSECONDS)
+ " >= "
+ new TimeValue(threshold, TimeUnit.NANOSECONDS)
+ "]",
1 // TODO: fine-tune the cancellation score/weight
)
);
});
}

public ElapsedTimeTracker(
LongSupplier thresholdSupplier,
LongSupplier timeNanosSupplier,
ResourceUsageBreachEvaluator resourceUsageBreachEvaluator
) {
this.thresholdSupplier = thresholdSupplier;
this.timeNanosSupplier = timeNanosSupplier;
this.resourceUsageBreachEvaluator = resourceUsageBreachEvaluator;
}

@Override
public String name() {
return ELAPSED_TIME_TRACKER.getName();
}

@Override
public Optional<TaskCancellation.Reason> checkAndMaybeGetCancellationReason(Task task) {
long usage = timeNanosSupplier.getAsLong() - task.getStartTimeNanos();
long threshold = thresholdSupplier.getAsLong();

if (usage < threshold) {
return Optional.empty();
}

return Optional.of(
new TaskCancellation.Reason(
"elapsed time exceeded ["
+ new TimeValue(usage, TimeUnit.NANOSECONDS)
+ " >= "
+ new TimeValue(threshold, TimeUnit.NANOSECONDS)
+ "]",
1 // TODO: fine-tune the cancellation score/weight
)
);
}

@Override
public TaskResourceUsageTracker.Stats stats(List<? extends Task> activeTasks) {
long now = timeNanosSupplier.getAsLong();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.monitor.jvm.JvmStats;
import org.opensearch.search.backpressure.trackers.TaskResourceUsageTrackers.TaskResourceUsageTracker;
import org.opensearch.tasks.CancellableTask;
import org.opensearch.tasks.Task;
import org.opensearch.tasks.TaskCancellation;
Expand Down Expand Up @@ -55,6 +56,43 @@ public HeapUsageTracker(
this.heapPercentThresholdSupplier = heapPercentThresholdSupplier;
this.movingAverageReference = new AtomicReference<>(new MovingAverage(heapMovingAverageWindowSize));
clusterSettings.addSettingsUpdateConsumer(windowSizeSetting, this::updateWindowSize);
setDefaultResourceUsageBreachEvaluator();
}

/**
* Had to refactor this method out of the constructor as we can't pass a lambda which references a member variable in constructor
* error: cannot reference movingAverageReference before supertype constructor has been called
*/
private void setDefaultResourceUsageBreachEvaluator() {
this.resourceUsageBreachEvaluator = (task) -> {
MovingAverage movingAverage = movingAverageReference.get();

// There haven't been enough measurements.
if (movingAverage.isReady() == false) {
return Optional.empty();
}

double currentUsage = task.getTotalResourceStats().getMemoryInBytes();
double averageUsage = movingAverage.getAverage();
double variance = heapVarianceSupplier.getAsDouble();
double allowedUsage = averageUsage * variance;
double threshold = heapPercentThresholdSupplier.getAsDouble() * HEAP_SIZE_BYTES;

if (isHeapTrackingSupported() == false || currentUsage < threshold || currentUsage < allowedUsage) {
return Optional.empty();
}

return Optional.of(
new TaskCancellation.Reason(
"heap usage exceeded ["
+ new ByteSizeValue((long) currentUsage)
+ " >= "
+ new ByteSizeValue((long) allowedUsage)
+ "]",
(int) (currentUsage / averageUsage) // TODO: fine-tune the cancellation score/weight
)
);
};
}

@Override
Expand All @@ -67,33 +105,6 @@ public void update(Task task) {
movingAverageReference.get().record(task.getTotalResourceStats().getMemoryInBytes());
}

@Override
public Optional<TaskCancellation.Reason> checkAndMaybeGetCancellationReason(Task task) {
MovingAverage movingAverage = movingAverageReference.get();

// There haven't been enough measurements.
if (movingAverage.isReady() == false) {
return Optional.empty();
}

double currentUsage = task.getTotalResourceStats().getMemoryInBytes();
double averageUsage = movingAverage.getAverage();
double variance = heapVarianceSupplier.getAsDouble();
double allowedUsage = averageUsage * variance;
double threshold = heapPercentThresholdSupplier.getAsDouble() * HEAP_SIZE_BYTES;

if (isHeapTrackingSupported() == false || currentUsage < threshold || currentUsage < allowedUsage) {
return Optional.empty();
}

return Optional.of(
new TaskCancellation.Reason(
"heap usage exceeded [" + new ByteSizeValue((long) currentUsage) + " >= " + new ByteSizeValue((long) allowedUsage) + "]",
(int) (currentUsage / averageUsage) // TODO: fine-tune the cancellation score/weight
)
);
}

private void updateWindowSize(int heapMovingAverageWindowSize) {
this.movingAverageReference.set(new MovingAverage(heapMovingAverageWindowSize));
}
Expand Down

This file was deleted.

Loading

0 comments on commit bcccedb

Please sign in to comment.