diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriberTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriberTest.java index 78e09fa1e..09ba6ec9f 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriberTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerSubscriberTest.java @@ -79,6 +79,8 @@ public class ShardConsumerSubscriberTest { private static final String TERMINAL_MARKER = "Terminal"; + private static final long DEFAULT_NOTIFIER_TIMEOUT = 5000L; + private final RequestDetails lastSuccessfulRequestDetails = new RequestDetails(); @Mock @@ -125,10 +127,7 @@ public void singleItemTest() throws Exception { setupNotifierAnswer(1); - synchronized (processedNotifier) { - subscriber.startSubscriptions(); - processedNotifier.wait(5000); - } + startSubscriptionsAndWait(); verify(shardConsumer).handleInput(argThat(eqProcessRecordsInput(processRecordsInput)), any(Subscription.class)); } @@ -139,10 +138,7 @@ public void multipleItemTest() throws Exception { setupNotifierAnswer(recordsPublisher.responses.size()); - synchronized (processedNotifier) { - subscriber.startSubscriptions(); - processedNotifier.wait(5000); - } + startSubscriptionsAndWait(); verify(shardConsumer, times(100)).handleInput(argThat(eqProcessRecordsInput(processRecordsInput)), any(Subscription.class)); } @@ -171,10 +167,7 @@ public Object answer(InvocationOnMock invocation) throws Throwable { } }).when(shardConsumer).handleInput(any(ProcessRecordsInput.class), any(Subscription.class)); - synchronized (processedNotifier) { - subscriber.startSubscriptions(); - processedNotifier.wait(5000); - } + startSubscriptionsAndWait(); assertThat(subscriber.getAndResetDispatchFailure(), equalTo(testException)); assertThat(subscriber.getAndResetDispatchFailure(), nullValue()); @@ -192,10 +185,7 @@ public void onErrorStopsProcessingTest() throws Exception { setupNotifierAnswer(10); - synchronized (processedNotifier) { - subscriber.startSubscriptions(); - processedNotifier.wait(5000); - } + startSubscriptionsAndWait(); for (int attempts = 0; attempts < 10; attempts++) { if (subscriber.retrievalFailure() != null) { @@ -220,10 +210,7 @@ public void restartAfterErrorTest() throws Exception { setupNotifierAnswer(10); - synchronized (processedNotifier) { - subscriber.startSubscriptions(); - processedNotifier.wait(5000); - } + startSubscriptionsAndWait(); for (int attempts = 0; attempts < 10; attempts++) { if (subscriber.retrievalFailure() != null) { @@ -236,7 +223,7 @@ public void restartAfterErrorTest() throws Exception { synchronized (processedNotifier) { assertThat(subscriber.healthCheck(100000), equalTo(expected)); - processedNotifier.wait(5000); + processedNotifier.wait(DEFAULT_NOTIFIER_TIMEOUT); } assertThat(recordsPublisher.restartedFrom, equalTo(edgeRecord)); @@ -267,10 +254,7 @@ public void restartAfterRequestTimerExpiresTest() throws Exception { return null; }).when(shardConsumer).handleInput(any(ProcessRecordsInput.class), any(Subscription.class)); - synchronized (processedNotifier) { - subscriber.startSubscriptions(); - processedNotifier.wait(5000); - } + startSubscriptionsAndWait(); synchronized (processedNotifier) { executorService.execute(() -> { @@ -290,7 +274,7 @@ public void restartAfterRequestTimerExpiresTest() throws Exception { // // Wait for our blocking thread to control the thread in the executor. // - processedNotifier.wait(5000); + processedNotifier.wait(DEFAULT_NOTIFIER_TIMEOUT); } Stream.iterate(2, i -> i + 1).limit(97).forEach(this::addUniqueItem); @@ -301,7 +285,7 @@ public void restartAfterRequestTimerExpiresTest() throws Exception { assertThat(subscriber.healthCheck(1), nullValue()); barrier.await(500, TimeUnit.MILLISECONDS); - processedNotifier.wait(5000); + processedNotifier.wait(DEFAULT_NOTIFIER_TIMEOUT); } verify(shardConsumer, times(100)).handleInput(argThat(eqProcessRecordsInput(processRecordsInput)), any(Subscription.class)); @@ -337,9 +321,7 @@ public void restartAfterRequestTimerExpiresWhenNotGettingRecordsAfterInitializat }).when(shardConsumer).handleInput(any(ProcessRecordsInput.class), any(Subscription.class)); // First try to start subscriptions. - synchronized (processedNotifier) { - subscriber.startSubscriptions(); - } + startSubscriptionsAndWait(100); // Verifying that there are no interactions with shardConsumer mock indicating no records were sent back and // subscription has not started correctly. @@ -397,9 +379,7 @@ public void restartAfterRequestTimerExpiresWhenInitialTaskExecutionIsRejected() }).when(shardConsumer).handleInput(any(ProcessRecordsInput.class), any(Subscription.class)); // First try to start subscriptions. - synchronized (processedNotifier) { - subscriber.startSubscriptions(); - } + startSubscriptionsAndWait(100); // Verifying that there are no interactions with shardConsumer mock indicating no records were sent back and // subscription has not started correctly. @@ -478,6 +458,17 @@ public Object answer(InvocationOnMock invocation) throws Throwable { }).when(shardConsumer).handleInput(any(ProcessRecordsInput.class), any(Subscription.class)); } + private void startSubscriptionsAndWait() throws InterruptedException { + startSubscriptionsAndWait(DEFAULT_NOTIFIER_TIMEOUT); + } + + private void startSubscriptionsAndWait(long timeout) throws InterruptedException { + synchronized (processedNotifier) { + subscriber.startSubscriptions(); + processedNotifier.wait(timeout); + } + } + private class ResponseItem { private final RecordsRetrieved recordsRetrieved; private final Throwable throwable;