diff --git a/server/src/internalClusterTest/java/org/opensearch/search/backpressure/SearchBackpressureIT.java b/server/src/internalClusterTest/java/org/opensearch/search/backpressure/SearchBackpressureIT.java index 1e4702409df02..f8629e2c88b07 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/backpressure/SearchBackpressureIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/backpressure/SearchBackpressureIT.java @@ -8,6 +8,7 @@ package org.opensearch.search.backpressure; +import org.hamcrest.MatcherAssert; import org.junit.After; import org.junit.Before; import org.opensearch.action.ActionListener; @@ -47,12 +48,14 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.instanceOf; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE) public class SearchBackpressureIT extends OpenSearchIntegTestCase { - private static final TimeValue TIMEOUT = new TimeValue(30, TimeUnit.SECONDS); + private static final TimeValue TIMEOUT = new TimeValue(10, TimeUnit.SECONDS); @Override protected Collection> nodePlugins() { @@ -90,15 +93,14 @@ public void testSearchShardTaskCancellationWithHighElapsedTime() throws Interrup .build(); assertAcked(client().admin().cluster().prepareUpdateSettings().setPersistentSettings(request).get()); - CountDownLatch latch = new CountDownLatch(1); - ExceptionCatchingListener listener = new ExceptionCatchingListener(latch); + ExceptionCatchingListener listener = new ExceptionCatchingListener(); client().execute(TestTransportAction.ACTION, new TestRequest(RequestType.HIGH_ELAPSED_TIME), listener); - latch.await(TIMEOUT.getSeconds(), TimeUnit.SECONDS); + assertTrue(listener.latch.await(TIMEOUT.getSeconds(), TimeUnit.SECONDS)); Exception caughtException = listener.getException(); assertNotNull("SearchShardTask should have been cancelled with TaskCancelledException", caughtException); - assertEquals(TaskCancelledException.class, caughtException.getClass()); - assertTrue(caughtException.getMessage().contains("elapsed time exceeded")); + MatcherAssert.assertThat(caughtException, instanceOf(TaskCancelledException.class)); + MatcherAssert.assertThat(caughtException.getMessage(), containsString("elapsed time exceeded")); } public void testSearchShardTaskCancellationWithHighCpu() throws InterruptedException { @@ -108,15 +110,14 @@ public void testSearchShardTaskCancellationWithHighCpu() throws InterruptedExcep .build(); assertAcked(client().admin().cluster().prepareUpdateSettings().setPersistentSettings(request).get()); - CountDownLatch latch = new CountDownLatch(1); - ExceptionCatchingListener listener = new ExceptionCatchingListener(latch); + ExceptionCatchingListener listener = new ExceptionCatchingListener(); client().execute(TestTransportAction.ACTION, new TestRequest(RequestType.HIGH_CPU), listener); - latch.await(TIMEOUT.getSeconds(), TimeUnit.SECONDS); + assertTrue(listener.latch.await(TIMEOUT.getSeconds(), TimeUnit.SECONDS)); Exception caughtException = listener.getException(); assertNotNull("SearchShardTask should have been cancelled with TaskCancelledException", caughtException); - assertEquals(TaskCancelledException.class, caughtException.getClass()); - assertTrue(caughtException.getMessage().contains("cpu usage exceeded")); + MatcherAssert.assertThat(caughtException, instanceOf(TaskCancelledException.class)); + MatcherAssert.assertThat(caughtException.getMessage(), containsString("cpu usage exceeded")); } public void testSearchShardTaskCancellationWithHighHeapUsage() throws InterruptedException { @@ -133,43 +134,43 @@ public void testSearchShardTaskCancellationWithHighHeapUsage() throws Interrupte .build(); assertAcked(client().admin().cluster().prepareUpdateSettings().setPersistentSettings(request).get()); - CountDownLatch latch = new CountDownLatch(1); - ExceptionCatchingListener listener = new ExceptionCatchingListener(latch); + ExceptionCatchingListener listener = new ExceptionCatchingListener(); for (int i = 0; i < MOVING_AVERAGE_WINDOW_SIZE; i++) { client(node).execute(TestTransportAction.ACTION, new TestRequest(RequestType.HIGH_HEAP), listener); } - latch = new CountDownLatch(1); - listener = new ExceptionCatchingListener(latch); + listener = new ExceptionCatchingListener(); client(node).execute(TestTransportAction.ACTION, new TestRequest(RequestType.HIGHER_HEAP), listener); - latch.await(TIMEOUT.getSeconds(), TimeUnit.SECONDS); + assertTrue(listener.latch.await(TIMEOUT.getSeconds(), TimeUnit.SECONDS)); Exception caughtException = listener.getException(); assertNotNull("SearchShardTask should have been cancelled with TaskCancelledException", caughtException); - assertEquals(TaskCancelledException.class, caughtException.getClass()); - assertTrue(caughtException.getMessage().contains("heap usage exceeded")); + MatcherAssert.assertThat(caughtException, instanceOf(TaskCancelledException.class)); + MatcherAssert.assertThat(caughtException.getMessage(), containsString("heap usage exceeded")); } public void testSearchCancellationWithBackpressureDisabled() throws InterruptedException { Settings request = Settings.builder().put(SearchBackpressureSettings.SETTING_MODE.getKey(), "monitor_only").build(); assertAcked(client().admin().cluster().prepareUpdateSettings().setPersistentSettings(request).get()); - CountDownLatch latch = new CountDownLatch(1); - ExceptionCatchingListener listener = new ExceptionCatchingListener(latch); + ExceptionCatchingListener listener = new ExceptionCatchingListener(); client().execute(TestTransportAction.ACTION, new TestRequest(RequestType.HIGH_ELAPSED_TIME), listener); - latch.await(TIMEOUT.getSeconds(), TimeUnit.SECONDS); + // waiting for the TIMEOUT * 3 time for the request to complete and the latch to countdown. + assertTrue( + "SearchShardTask should have been completed by now and countdown the latch", + listener.latch.await(TIMEOUT.getSeconds() * 3, TimeUnit.SECONDS) + ); Exception caughtException = listener.getException(); assertNull("SearchShardTask shouldn't have cancelled for monitor_only mode", caughtException); - } private static class ExceptionCatchingListener implements ActionListener { private final CountDownLatch latch; private Exception exception = null; - public ExceptionCatchingListener(CountDownLatch latch) { - this.latch = latch; + public ExceptionCatchingListener() { + this.latch = new CountDownLatch(1); } @Override