Skip to content

Commit

Permalink
Refactoring changes
Browse files Browse the repository at this point in the history
Signed-off-by: Ketan Verma <ketan9495@gmail.com>
  • Loading branch information
ketanv3 committed Oct 20, 2022
1 parent 961ad71 commit aeb6521
Show file tree
Hide file tree
Showing 11 changed files with 50 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ TaskCancellation getTaskCancellation(CancellableTask task) {
List<Runnable> callbacks = new ArrayList<>();

for (TaskResourceUsageTracker tracker : taskResourceUsageTrackers) {
Optional<TaskCancellation.Reason> reason = tracker.cancellationReason(task);
Optional<TaskCancellation.Reason> reason = tracker.checkAndMaybeGetCancellationReason(task);
if (reason.isPresent()) {
reasons.add(reason.get());
callbacks.add(tracker::incrementCancellations);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
* @opensearch.internal
*/
public class CpuUsageTracker extends TaskResourceUsageTracker {
public static final String NAME = "cpu_usage_tracker";
public static final TaskResourceUsageTrackerType TYPE = TaskResourceUsageTrackerType.CPU_USAGE_TRACKER;

private final LongSupplier cpuTimeNanosThresholdSupplier;

Expand All @@ -33,14 +33,11 @@ public CpuUsageTracker(SearchBackpressureSettings settings) {

@Override
public String name() {
return NAME;
return TYPE.getName();
}

@Override
public void update(Task task) {}

@Override
public Optional<TaskCancellation.Reason> cancellationReason(Task task) {
public Optional<TaskCancellation.Reason> checkAndMaybeGetCancellationReason(Task task) {
long usage = task.getTotalResourceStats().getCpuTimeInNanos();
long threshold = cpuTimeNanosThresholdSupplier.getAsLong();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
* @opensearch.internal
*/
public class ElapsedTimeTracker extends TaskResourceUsageTracker {
public static final String NAME = "elapsed_time_tracker";
public static final TaskResourceUsageTrackerType TYPE = TaskResourceUsageTrackerType.ELAPSED_TIME_TRACKER;

private final LongSupplier timeNanosSupplier;
private final LongSupplier elapsedTimeNanosThresholdSupplier;
Expand All @@ -35,14 +35,11 @@ public ElapsedTimeTracker(SearchBackpressureSettings settings, LongSupplier time

@Override
public String name() {
return NAME;
return TYPE.getName();
}

@Override
public void update(Task task) {}

@Override
public Optional<TaskCancellation.Reason> cancellationReason(Task task) {
public Optional<TaskCancellation.Reason> checkAndMaybeGetCancellationReason(Task task) {
long usage = timeNanosSupplier.getAsLong() - task.getStartTimeNanos();
long threshold = elapsedTimeNanosThresholdSupplier.getAsLong();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
* @opensearch.internal
*/
public class HeapUsageTracker extends TaskResourceUsageTracker implements SearchShardTaskSettings.Listener {
public static final String NAME = "heap_usage_tracker";
public static final TaskResourceUsageTrackerType TYPE = TaskResourceUsageTrackerType.HEAP_USAGE_TRACKER;

private final LongSupplier heapBytesThresholdSupplier;
private final DoubleSupplier heapVarianceThresholdSupplier;
Expand All @@ -44,7 +44,7 @@ public HeapUsageTracker(SearchBackpressureSettings settings) {

@Override
public String name() {
return NAME;
return TYPE.getName();
}

@Override
Expand All @@ -53,7 +53,7 @@ public void update(Task task) {
}

@Override
public Optional<TaskCancellation.Reason> cancellationReason(Task task) {
public Optional<TaskCancellation.Reason> checkAndMaybeGetCancellationReason(Task task) {
MovingAverage movingAverage = movingAverageReference.get();

// There haven't been enough measurements.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@ public long getCancellations() {
/**
* Notifies the tracker to update its state when a task execution completes.
*/
public abstract void update(Task task);
public void update(Task task) {}

/**
* Returns the cancellation reason for the given task, if it's eligible for cancellation.
*/
public abstract Optional<TaskCancellation.Reason> cancellationReason(Task task);
public abstract Optional<TaskCancellation.Reason> checkAndMaybeGetCancellationReason(Task task);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search.backpressure.trackers;

/**
* Defines the type of TaskResourceUsageTracker.
*/
public enum TaskResourceUsageTrackerType {
CPU_USAGE_TRACKER("cpu_usage_tracker"),
HEAP_USAGE_TRACKER("heap_usage_tracker"),
ELAPSED_TIME_TRACKER("elapsed_time_tracker");

private final String name;

TaskResourceUsageTrackerType(String name) {
this.name = name;
}

public String getName() {
return name;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public String name() {
public void update(Task task) {}

@Override
public Optional<TaskCancellation.Reason> cancellationReason(Task task) {
public Optional<TaskCancellation.Reason> checkAndMaybeGetCancellationReason(Task task) {
if (task.getTotalResourceStats().getCpuTimeInNanos() < 300) {
return Optional.empty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public void testEligibleForCancellation() {
Task task = createMockTaskWithResourceStats(SearchShardTask.class, 200000000, 200);
CpuUsageTracker tracker = new CpuUsageTracker(mockSettings);

Optional<TaskCancellation.Reason> reason = tracker.cancellationReason(task);
Optional<TaskCancellation.Reason> reason = tracker.checkAndMaybeGetCancellationReason(task);
assertTrue(reason.isPresent());
assertEquals(1, reason.get().getCancellationScore());
assertEquals("cpu usage exceeded [200ms >= 15ms]", reason.get().getMessage());
Expand All @@ -43,7 +43,7 @@ public void testNotEligibleForCancellation() {
Task task = createMockTaskWithResourceStats(SearchShardTask.class, 5000000, 200);
CpuUsageTracker tracker = new CpuUsageTracker(mockSettings);

Optional<TaskCancellation.Reason> reason = tracker.cancellationReason(task);
Optional<TaskCancellation.Reason> reason = tracker.checkAndMaybeGetCancellationReason(task);
assertFalse(reason.isPresent());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public void testEligibleForCancellation() {
Task task = createMockTaskWithResourceStats(SearchShardTask.class, 1, 1, 0);
ElapsedTimeTracker tracker = new ElapsedTimeTracker(mockSettings, () -> 200000000);

Optional<TaskCancellation.Reason> reason = tracker.cancellationReason(task);
Optional<TaskCancellation.Reason> reason = tracker.checkAndMaybeGetCancellationReason(task);
assertTrue(reason.isPresent());
assertEquals(1, reason.get().getCancellationScore());
assertEquals("elapsed time exceeded [200ms >= 100ms]", reason.get().getMessage());
Expand All @@ -44,7 +44,7 @@ public void testNotEligibleForCancellation() {
Task task = createMockTaskWithResourceStats(SearchShardTask.class, 1, 1, 150000000);
ElapsedTimeTracker tracker = new ElapsedTimeTracker(mockSettings, () -> 200000000);

Optional<TaskCancellation.Reason> reason = tracker.cancellationReason(task);
Optional<TaskCancellation.Reason> reason = tracker.checkAndMaybeGetCancellationReason(task);
assertFalse(reason.isPresent());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public void testEligibleForCancellation() {

// Task that has heap usage >= heapBytesThreshold and (movingAverage * heapVariance).
task = createMockTaskWithResourceStats(SearchShardTask.class, 1, 200);
Optional<TaskCancellation.Reason> reason = tracker.cancellationReason(task);
Optional<TaskCancellation.Reason> reason = tracker.checkAndMaybeGetCancellationReason(task);
assertTrue(reason.isPresent());
assertEquals(4, reason.get().getCancellationScore());
assertEquals("heap usage exceeded [200b >= 100b]", reason.get().getMessage());
Expand All @@ -59,7 +59,7 @@ public void testNotEligibleForCancellation() {
task = createMockTaskWithResourceStats(SearchShardTask.class, 1, 99);

// Not enough observations.
reason = tracker.cancellationReason(task);
reason = tracker.checkAndMaybeGetCancellationReason(task);
assertFalse(reason.isPresent());

// Record enough observations to make the moving average 'ready'.
Expand All @@ -68,13 +68,13 @@ public void testNotEligibleForCancellation() {
}

// Task with heap usage < heapBytesThreshold should not be cancelled.
reason = tracker.cancellationReason(task);
reason = tracker.checkAndMaybeGetCancellationReason(task);
assertFalse(reason.isPresent());

// Task with heap usage between heapBytesThreshold and (movingAverage * heapVariance) should not be cancelled.
double allowedHeapUsage = 99.0 * 2.0;
task = createMockTaskWithResourceStats(SearchShardTask.class, 1, randomLongBetween(99, (long) allowedHeapUsage - 1));
reason = tracker.cancellationReason(task);
reason = tracker.checkAndMaybeGetCancellationReason(task);
assertFalse(reason.isPresent());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public String name() {
public void update(Task task) {}

@Override
public Optional<TaskCancellation.Reason> cancellationReason(Task task) {
public Optional<TaskCancellation.Reason> checkAndMaybeGetCancellationReason(Task task) {
return Optional.empty();
}
};
Expand Down

0 comments on commit aeb6521

Please sign in to comment.