Skip to content

Commit

Permalink
Adding tests for high heap usage
Browse files Browse the repository at this point in the history
  • Loading branch information
PritLadani committed Nov 22, 2022
1 parent b272a12 commit cf0beb1
Showing 1 changed file with 87 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.opensearch.search.backpressure.settings.SearchShardTaskSettings;
import org.opensearch.search.backpressure.trackers.CpuUsageTracker;
import org.opensearch.search.backpressure.trackers.ElapsedTimeTracker;
import org.opensearch.search.backpressure.trackers.HeapUsageTracker;
import org.opensearch.tasks.Task;
import org.opensearch.tasks.TaskCancelledException;
import org.opensearch.tasks.TaskId;
Expand Down Expand Up @@ -85,81 +86,112 @@ public final void cleanupNodeSettings() {
public void testSearchShardTaskCancellationWithHighElapsedTime() throws InterruptedException {
Settings request = Settings.builder()
.put(SearchBackpressureSettings.SETTING_MODE.getKey(), "enforced")
.put(ElapsedTimeTracker.SETTING_ELAPSED_TIME_MILLIS_THRESHOLD.getKey(), 5000)
.put(ElapsedTimeTracker.SETTING_ELAPSED_TIME_MILLIS_THRESHOLD.getKey(), 1000)
.build();
assertAcked(client().admin().cluster().prepareUpdateSettings().setPersistentSettings(request).get());

CountDownLatch latch = new CountDownLatch(1);

client().execute(TestTransportAction.ACTION, new TestRequest(RequestType.HIGH_ELAPSED_TIME), new ActionListener<>() {
@Override
public void onResponse(TestResponse testResponse) {
fail("SearchShardTask should have been cancelled");
latch.countDown();
}

@Override
public void onFailure(Exception e) {
assertEquals(TaskCancelledException.class, e.getClass());
assertTrue(e.getMessage().contains("elapsed time exceeded"));
latch.countDown();
}
});

ExceptionCatchingListener listener = new ExceptionCatchingListener(latch);
client().execute(TestTransportAction.ACTION, new TestRequest(RequestType.HIGH_ELAPSED_TIME), listener);
latch.await(TIMEOUT.getSeconds(), TimeUnit.SECONDS);

Exception caughtException = listener.getException();
assertNotNull("SearchShardTask should have been cancelled with TaskCancelledException", caughtException);
assertEquals(TaskCancelledException.class, caughtException.getClass());
assertTrue(caughtException.getMessage().contains("elapsed time exceeded"));
}

public void testSearchShardTaskCancellationWithHighCpu() throws InterruptedException {
Settings request = Settings.builder()
.put(SearchBackpressureSettings.SETTING_MODE.getKey(), "enforced")
.put(CpuUsageTracker.SETTING_CPU_TIME_MILLIS_THRESHOLD.getKey(), 15000)
.put(CpuUsageTracker.SETTING_CPU_TIME_MILLIS_THRESHOLD.getKey(), 1000)
.build();
assertAcked(client().admin().cluster().prepareUpdateSettings().setPersistentSettings(request).get());

CountDownLatch latch = new CountDownLatch(1);
ExceptionCatchingListener listener = new ExceptionCatchingListener(latch);
client().execute(TestTransportAction.ACTION, new TestRequest(RequestType.HIGH_CPU), listener);
latch.await(TIMEOUT.getSeconds(), TimeUnit.SECONDS);

client().execute(TestTransportAction.ACTION, new TestRequest(RequestType.HIGH_CPU), new ActionListener<>() {
@Override
public void onResponse(TestResponse testResponse) {
fail("SearchShardTask should have been cancelled");
latch.countDown();
}
Exception caughtException = listener.getException();
assertNotNull("SearchShardTask should have been cancelled with TaskCancelledException", caughtException);
assertEquals(TaskCancelledException.class, caughtException.getClass());
assertTrue(caughtException.getMessage().contains("cpu usage exceeded"));
}

@Override
public void onFailure(Exception e) {
assertEquals(TaskCancelledException.class, e.getClass());
assertTrue(e.getMessage().contains("cpu usage exceeded"));
latch.countDown();
}
});
public void testSearchShardTaskCancellationWithHighHeapUsage() throws InterruptedException {
// Before SearchBackpressureService cancels a task based on its heap usage, we need to build up the heap moving average
// To build up the heap moving average, we need to hit the same node with multiple requests and then hit the same node with a
// request having higher heap usage
String node = internalCluster().startDataOnlyNode();
final int MOVING_AVERAGE_WINDOW_SIZE = 10;
Settings request = Settings.builder()
.put(SearchBackpressureSettings.SETTING_MODE.getKey(), "enforced")
.put(HeapUsageTracker.SETTING_HEAP_PERCENT_THRESHOLD.getKey(), 0.00001)
.put(HeapUsageTracker.SETTING_HEAP_VARIANCE_THRESHOLD.getKey(), 1.0)
.put(HeapUsageTracker.SETTING_HEAP_MOVING_AVERAGE_WINDOW_SIZE.getKey(), MOVING_AVERAGE_WINDOW_SIZE)
.build();
assertAcked(client().admin().cluster().prepareUpdateSettings().setPersistentSettings(request).get());

CountDownLatch latch = new CountDownLatch(1);
ExceptionCatchingListener listener = new ExceptionCatchingListener(latch);
for (int i = 0; i < MOVING_AVERAGE_WINDOW_SIZE; i++) {
client(node).execute(TestTransportAction.ACTION, new TestRequest(RequestType.HIGH_HEAP), listener);
}

latch = new CountDownLatch(1);
listener = new ExceptionCatchingListener(latch);
client(node).execute(TestTransportAction.ACTION, new TestRequest(RequestType.HIGHER_HEAP), listener);
latch.await(TIMEOUT.getSeconds(), TimeUnit.SECONDS);

Exception caughtException = listener.getException();
assertNotNull("SearchShardTask should have been cancelled with TaskCancelledException", caughtException);
assertEquals(TaskCancelledException.class, caughtException.getClass());
assertTrue(caughtException.getMessage().contains("heap usage exceeded"));
}

public void testSearchCancellationWithBackpressureDisabled() throws InterruptedException {
Settings request = Settings.builder().put(SearchBackpressureSettings.SETTING_MODE.getKey(), "monitor_only").build();
assertAcked(client().admin().cluster().prepareUpdateSettings().setPersistentSettings(request).get());

CountDownLatch latch = new CountDownLatch(1);
ExceptionCatchingListener listener = new ExceptionCatchingListener(latch);
client().execute(TestTransportAction.ACTION, new TestRequest(RequestType.HIGH_ELAPSED_TIME), listener);
latch.await(TIMEOUT.getSeconds(), TimeUnit.SECONDS);

client().execute(TestTransportAction.ACTION, new TestRequest(RequestType.HIGH_ELAPSED_TIME), new ActionListener<>() {
@Override
public void onResponse(TestResponse testResponse) {
latch.countDown();
}
Exception caughtException = listener.getException();
assertNull("SearchShardTask shouldn't have cancelled for monitor_only mode", caughtException);

@Override
public void onFailure(Exception e) {
fail("SearchShardTask shouldn't have cancelled for monitor_only mode");
latch.countDown();
}
});
}

latch.await(TIMEOUT.getSeconds(), TimeUnit.SECONDS);
private static class ExceptionCatchingListener implements ActionListener<TestResponse> {
private final CountDownLatch latch;
private Exception exception = null;

public ExceptionCatchingListener(CountDownLatch latch) {
this.latch = latch;
}

@Override
public void onResponse(TestResponse r) {
latch.countDown();
}

@Override
public void onFailure(Exception e) {
this.exception = e;
latch.countDown();
}

private Exception getException() {
return exception;
}
}

enum RequestType {
HIGH_CPU,
HIGH_HEAP,
HIGHER_HEAP,
HIGH_ELAPSED_TIME;
}

Expand Down Expand Up @@ -223,9 +255,12 @@ protected void doExecute(Task task, TestRequest request, ActionListener<TestResp
long startTime = System.nanoTime();

// Doing a busy-wait until task cancellation or timeout.
// We are running HIGH_HEAP requests to build up heap moving average and not expect it to get cancelled.
do {
doWork(request);
} while (searchShardTask.isCancelled() == false && (System.nanoTime() - startTime) < TIMEOUT.getNanos());
} while (request.type != RequestType.HIGH_HEAP
&& searchShardTask.isCancelled() == false
&& (System.nanoTime() - startTime) < TIMEOUT.getNanos());

if (searchShardTask.isCancelled()) {
throw new TaskCancelledException(searchShardTask.getReasonCancelled());
Expand All @@ -248,6 +283,14 @@ private void doWork(TestRequest request) throws InterruptedException {
i++;
} while (i < iterations);
break;
case HIGH_HEAP:
Byte[] bytes = new Byte[100000];
int[] ints = new int[1000];
break;
case HIGHER_HEAP:
Byte[] more_bytes = new Byte[1000000];
int[] more_ints = new int[10000];
break;
case HIGH_ELAPSED_TIME:
Thread.sleep(100);
break;
Expand Down

0 comments on commit cf0beb1

Please sign in to comment.