diff --git a/datastream-server-restli/src/test/java/com/linkedin/datastream/server/dms/TestDatastreamResources.java b/datastream-server-restli/src/test/java/com/linkedin/datastream/server/dms/TestDatastreamResources.java index fa8f98f86..8f3ff03c1 100644 --- a/datastream-server-restli/src/test/java/com/linkedin/datastream/server/dms/TestDatastreamResources.java +++ b/datastream-server-restli/src/test/java/com/linkedin/datastream/server/dms/TestDatastreamResources.java @@ -17,7 +17,6 @@ import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; @@ -296,24 +295,16 @@ public void testSimultaneousStopDatastreamRequests() { PathKeys pathKey = Mockito.mock(PathKeys.class); Mockito.when(pathKey.getAsString(DatastreamResources.KEY_NAME)).thenReturn(datastreamName); - // Stop datastream. Assert.assertEquals(resource1.get(datastreamName).getStatus(), DatastreamStatus.READY); + // Stop datastream. ExecutorService executor = Executors.newFixedThreadPool(2); executor.execute(() -> resource1.stop(pathKey, false)); - executor.execute(() -> resource1.stop(pathKey, false)); - - try { - executor.awaitTermination(100, TimeUnit.MILLISECONDS); - } catch (InterruptedException ignored) { - } finally { - executor.shutdownNow(); - } + executor.execute(() -> resource2.stop(pathKey, false)); // Retrieve datastream and check that is in STOPPED state. - Datastream ds = resource2.get(datastreamName); - Assert.assertNotNull(ds); - Assert.assertEquals(ds.getStatus(), DatastreamStatus.STOPPED); + Assert.assertTrue(PollUtils.poll(() -> resource1.get(datastreamName).getStatus() == DatastreamStatus.STOPPED, 100, 1000)); + executor.shutdownNow(); } @Test