Skip to content

Commit

Permalink
Adding assertion on latch.await()
Browse files Browse the repository at this point in the history
Signed-off-by: PritLadani <pritkladani@gmail.com>
  • Loading branch information
PritLadani committed Nov 22, 2022
1 parent 16eba20 commit e336a04
Showing 1 changed file with 25 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.search.backpressure;

import org.hamcrest.MatcherAssert;
import org.junit.After;
import org.junit.Before;
import org.opensearch.action.ActionListener;
Expand Down Expand Up @@ -47,12 +48,14 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.instanceOf;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE)
public class SearchBackpressureIT extends OpenSearchIntegTestCase {

private static final TimeValue TIMEOUT = new TimeValue(30, TimeUnit.SECONDS);
private static final TimeValue TIMEOUT = new TimeValue(10, TimeUnit.SECONDS);

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
Expand Down Expand Up @@ -90,15 +93,14 @@ public void testSearchShardTaskCancellationWithHighElapsedTime() throws Interrup
.build();
assertAcked(client().admin().cluster().prepareUpdateSettings().setPersistentSettings(request).get());

CountDownLatch latch = new CountDownLatch(1);
ExceptionCatchingListener listener = new ExceptionCatchingListener(latch);
ExceptionCatchingListener listener = new ExceptionCatchingListener();
client().execute(TestTransportAction.ACTION, new TestRequest(RequestType.HIGH_ELAPSED_TIME), listener);
latch.await(TIMEOUT.getSeconds(), TimeUnit.SECONDS);
assertTrue(listener.latch.await(TIMEOUT.getSeconds(), TimeUnit.SECONDS));

Exception caughtException = listener.getException();
assertNotNull("SearchShardTask should have been cancelled with TaskCancelledException", caughtException);
assertEquals(TaskCancelledException.class, caughtException.getClass());
assertTrue(caughtException.getMessage().contains("elapsed time exceeded"));
MatcherAssert.assertThat(caughtException, instanceOf(TaskCancelledException.class));
MatcherAssert.assertThat(caughtException.getMessage(), containsString("elapsed time exceeded"));
}

public void testSearchShardTaskCancellationWithHighCpu() throws InterruptedException {
Expand All @@ -108,15 +110,14 @@ public void testSearchShardTaskCancellationWithHighCpu() throws InterruptedExcep
.build();
assertAcked(client().admin().cluster().prepareUpdateSettings().setPersistentSettings(request).get());

CountDownLatch latch = new CountDownLatch(1);
ExceptionCatchingListener listener = new ExceptionCatchingListener(latch);
ExceptionCatchingListener listener = new ExceptionCatchingListener();
client().execute(TestTransportAction.ACTION, new TestRequest(RequestType.HIGH_CPU), listener);
latch.await(TIMEOUT.getSeconds(), TimeUnit.SECONDS);
assertTrue(listener.latch.await(TIMEOUT.getSeconds(), TimeUnit.SECONDS));

Exception caughtException = listener.getException();
assertNotNull("SearchShardTask should have been cancelled with TaskCancelledException", caughtException);
assertEquals(TaskCancelledException.class, caughtException.getClass());
assertTrue(caughtException.getMessage().contains("cpu usage exceeded"));
MatcherAssert.assertThat(caughtException, instanceOf(TaskCancelledException.class));
MatcherAssert.assertThat(caughtException.getMessage(), containsString("cpu usage exceeded"));
}

public void testSearchShardTaskCancellationWithHighHeapUsage() throws InterruptedException {
Expand All @@ -133,43 +134,43 @@ public void testSearchShardTaskCancellationWithHighHeapUsage() throws Interrupte
.build();
assertAcked(client().admin().cluster().prepareUpdateSettings().setPersistentSettings(request).get());

CountDownLatch latch = new CountDownLatch(1);
ExceptionCatchingListener listener = new ExceptionCatchingListener(latch);
ExceptionCatchingListener listener = new ExceptionCatchingListener();
for (int i = 0; i < MOVING_AVERAGE_WINDOW_SIZE; i++) {
client(node).execute(TestTransportAction.ACTION, new TestRequest(RequestType.HIGH_HEAP), listener);
}

latch = new CountDownLatch(1);
listener = new ExceptionCatchingListener(latch);
listener = new ExceptionCatchingListener();
client(node).execute(TestTransportAction.ACTION, new TestRequest(RequestType.HIGHER_HEAP), listener);
latch.await(TIMEOUT.getSeconds(), TimeUnit.SECONDS);
assertTrue(listener.latch.await(TIMEOUT.getSeconds(), TimeUnit.SECONDS));

Exception caughtException = listener.getException();
assertNotNull("SearchShardTask should have been cancelled with TaskCancelledException", caughtException);
assertEquals(TaskCancelledException.class, caughtException.getClass());
assertTrue(caughtException.getMessage().contains("heap usage exceeded"));
MatcherAssert.assertThat(caughtException, instanceOf(TaskCancelledException.class));
MatcherAssert.assertThat(caughtException.getMessage(), containsString("heap usage exceeded"));
}

public void testSearchCancellationWithBackpressureDisabled() throws InterruptedException {
Settings request = Settings.builder().put(SearchBackpressureSettings.SETTING_MODE.getKey(), "monitor_only").build();
assertAcked(client().admin().cluster().prepareUpdateSettings().setPersistentSettings(request).get());

CountDownLatch latch = new CountDownLatch(1);
ExceptionCatchingListener listener = new ExceptionCatchingListener(latch);
ExceptionCatchingListener listener = new ExceptionCatchingListener();
client().execute(TestTransportAction.ACTION, new TestRequest(RequestType.HIGH_ELAPSED_TIME), listener);
latch.await(TIMEOUT.getSeconds(), TimeUnit.SECONDS);
// waiting for the TIMEOUT * 3 time for the request to complete and the latch to countdown.
assertTrue(
"SearchShardTask should have been completed by now and countdown the latch",
listener.latch.await(TIMEOUT.getSeconds() * 3, TimeUnit.SECONDS)
);

Exception caughtException = listener.getException();
assertNull("SearchShardTask shouldn't have cancelled for monitor_only mode", caughtException);

}

private static class ExceptionCatchingListener implements ActionListener<TestResponse> {
private final CountDownLatch latch;
private Exception exception = null;

public ExceptionCatchingListener(CountDownLatch latch) {
this.latch = latch;
public ExceptionCatchingListener() {
this.latch = new CountDownLatch(1);
}

@Override
Expand Down

0 comments on commit e336a04

Please sign in to comment.